Skip to content

Latest commit

 

History

History
201 lines (155 loc) · 4.5 KB

File metadata and controls

201 lines (155 loc) · 4.5 KB

timer

Go implementation of Kafka's Hierarchical Timing Wheels.

Go.Dev reference codecov Tests Go Report Card License Tag

Feature

  • Unlimited hierarchical wheel.
  • insert, delete, scan task almost O(1).
  • Different from the time wheel of Linux, it has no maximum time limit.
  • It is not advancing per TickMs, it uses DelayQueue to directly take out the most recently expired Spoke, and then advances to the expiration time of the Spoke in one step, preventing empty advances.
  • built-in a global timer instance, that tick is 1ms. wheel size is 128, use ants goroutine pool.

Usage

Installation

Use go get.

    go get github.com/thinkgos/timer

Then import the package into your own code.

    import "github.com/thinkgos/timer"

Example

monitor

package main

import (
	"log"
	"math"
	"math/rand/v2"
	"net/http"
	"sync/atomic"
	"time"

	_ "net/http/pprof"

	"github.com/thinkgos/timer"
)

// almost 1,000,000 task
func main() {
	go func() {
		sum := &atomic.Int64{}
		t := time.NewTicker(time.Second)
		for {
			<-t.C
			added := 0
			ranv := rand.IntN(10)
			max := int(rand.Uint32N(math.MaxUint16 << 2))
			for i := 100; i < max; i += 200 {
				added++
				ii := i + ranv

				timer.Go(func() {
					sum.Add(1)
					delayms := int64(ii) * 20
					task := timer.NewTask(time.Duration(delayms) * time.Millisecond).WithJob(&job{
						sum:          sum,
						expirationMs: time.Now().UnixMilli() + delayms,
					})
					timer.AddTask(task)

					// for test race
					// if ii%0x03 == 0x00 {
					// 	timer.Go(func() {
					// 		task.Cancel()
					// 	})
					// }
				})
			}
			log.Printf("task: %v - %v added: %d", timer.TaskCounter(), sum.Load(), added)
		}
	}()

	addr := ":9990"
	log.Printf("http stated '%v'\n", addr)
	log.Println(http.ListenAndServe(addr, nil))
}

type job struct {
	sum          *atomic.Int64
	expirationMs int64
}

func (j *job) Run() {
	j.sum.Add(-1)
	now := time.Now().UnixMilli()
	if diff := now - j.expirationMs; diff > 1 {
		log.Printf("this task no equal, diff: %d %d %d\n", now, j.expirationMs, diff)
	}
}

repetition

package main

import (
	"fmt"
	"time"

	"github.com/thinkgos/timer"
)

// one or two second delay repetition example
func main() {
	job := NewRepetitionJob()
	_ = timer.AddDerefTask(job)
	select {}
}

type RepetitionJob struct {
	task *timer.Task
	i    int
}

var _ timer.TaskContainer = (*RepetitionJob)(nil)

func NewRepetitionJob() *RepetitionJob {
	j := &RepetitionJob{
		task: timer.NewTask(time.Second),
		i:    1,
	}
	j.task.WithJob(j)
	return j
}

func (j *RepetitionJob) Run() {
	now := time.Now().String()
	j.i++
	_ = timer.AddTask(j.task.SetDelay(time.Second * time.Duration((j.i%2 + 1))))
	fmt.Printf("%s: repetition executed,\n", now)
}

func (j *RepetitionJob) DerefTask() *timer.Task { return j.task }

sample

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/thinkgos/timer"
)

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		index := i
		_, _ = timer.AfterFunc(time.Duration(i)*100*time.Millisecond, func() {
			fmt.Printf("%s: timer task %d is executed, remain task: %d\n", time.Now().String(), index, timer.TaskCounter())
			wg.Done()
		})
	}
	wg.Wait()
}

How it works

References

License

This project is under MIT License. See the LICENSE file for the full license text.