zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Go语言使用定时器实现任务队列

Go队列语言队列 实现 使用 任务 定时器
2023-06-13 09:12:01 时间
Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。

Go语言中定时器

一般用法:


package main

import(

 fmt 

 time 

func main() {

 input := make(chan interface{})

 //producer - produce the messages

 go func() {

 for i := 0; i i++ {

 input - i

 input - hello, world 

 t1 := time.NewTimer(time.Second * 5)

 t2 := time.NewTimer(time.Second * 10)

 for {

 select {

 //consumer - consume the messages

 case msg := -input:

 fmt.Println(msg)

 case -t1.C:

 println( 5s timer )

 t1.Reset(time.Second * 5)

 case -t2.C:

 println( 10s timer )

 t2.Reset(time.Second * 10)

}

上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:

type Timer struct {
  C -chan Time
  r runtimeTimer
}

原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 - 或者 - 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。

设计我们的定时任务队列

当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。

具体的流程如下图所示:

Go语言使用定时器实现任务队列

定义结构


type OnceCron struct {

 tasks []*Task //任务的列队

 add chan *Task //当遭遇到新任务的时候

 remove chan string //当遭遇到删除任务的时候

 stop chan struct{} //当遇到停止信号的时候

 Logger *log.Logger //日志

type Job interface {

 Run() //执行接口

type Task struct {

 Job Job //要执行的任务

 Uuid string //任务标识,删除时用

 RunTime int64 //执行时间

 Spacing int64 //间隔时间

 EndTime int64 //结束时间

 Number int //总共要次数

}

首先,我们要获得一个队列任务

func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。

然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。


func (one *OnceCron) Start() {

 //初始化的时候加入一个一年的长定时器,间隔1小时执行一次

 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {

 log.Println( It s a Hour timer! )

 }) //为了代码格式 markdown 里面有个括号改成全角了

 one.tasks = append(one.tasks, task)

 go one.run() //协成执行 防止主进程被阻塞

}

执行部分应该是重点的,分成三部:


remove: make(chan string), Logger: log.New(os.Stdout, [Control]: , log.Ldate|log.Ltime|log.Lshortfile), lock: false, //add spacing time job to list with number func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) { task := getTaskWithFuncSpacingNumber(spaceTime, number, f) scheduler.addTask(task) //add spacing time job to list with endTime func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) { task := getTaskWithFuncSpacing(spaceTime, endTime, f) scheduler.addTask(task) //add func to list func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) { task := getTaskWithFunc(unixTime, f) scheduler.addTask(task) func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { scheduler.addTask(task) //add a task to list func (scheduler *TaskScheduler) AddTask(task *Task) string { if task.RunTime != 0 { if task.RunTime 100000000000 { task.RunTime = task.RunTime * int64(time.Second) if task.RunTime time.Now().UnixNano() { //延遲1秒 task.RunTime = time.Now().UnixNano() + int64(time.Second) } else { if task.Spacing 0 { task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second) }else{ scheduler.Logger.Println( error too add task! Runtime error ) return if task.Uuid == { task.Uuid = uuid.New().String() return scheduler.addTask(task) //if lock add to swap func (scheduler *TaskScheduler) addTask(task TaskInterface) string { if scheduler.lock { scheduler.swap = append(scheduler.swap, task) scheduler.add - task } else{ scheduler.tasks = append(scheduler.tasks, task) scheduler.add - task return task.GetUuid() //new export func (scheduler *TaskScheduler) ExportInterface() []TaskInterface { return scheduler.tasks //compatible old export tasks func (scheduler *TaskScheduler) Export() []*Task { task := make([]*Task,0) for _,v := range scheduler.tasks { task = append(task, v.(*Task)) return task //stop task with uuid func (scheduler *TaskScheduler) StopOnce(uuidStr string) { scheduler.remove - uuidStr //run Cron func (scheduler *TaskScheduler) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() { log.Println( It s a Hour timer! ) scheduler.tasks = append(scheduler.tasks, task) go scheduler.run() //stop all func (scheduler *TaskScheduler) Stop() { scheduler.stop - struct{}{} //run task list //if is empty, run a year timer task func (scheduler *TaskScheduler) run() { for { now := time.Now() task, key := scheduler.GetTask() runTime := task.GetRunTime() i64 := runTime - now.UnixNano() var d time.Duration if i64 0 { scheduler.tasks[key].SetRuntime(now.UnixNano()) if task != nil { go task.RunJob() scheduler.doAndReset(key) continue } else { sec := runTime / int64(time.Second) nsec := runTime % int64(time.Second) d = time.Unix(sec, nsec).Sub(now) timer := time.NewTimer(d) //catch a chan and do something for { select { //if time has expired do task and shift key if is task list case -timer.C: scheduler.doAndReset(key) if task != nil { //fmt.Println(scheduler.tasks[key]) go task.RunJob() timer.Stop() //if add task case -scheduler.add: timer.Stop() // remove task with remove uuid case uuidstr := -scheduler.remove: scheduler.removeTask(uuidstr) timer.Stop() //if get a stop single exit case -scheduler.stop: timer.Stop() return break //return a task and key In task list func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) { scheduler.Lock() defer scheduler.UnLock() min := scheduler.tasks[0].GetRunTime() tempKey = 0 for key, task := range scheduler.tasks { tTime := task.GetRunTime() if min = tTime { continue if min tTime { tempKey = key min = tTime continue task = scheduler.tasks[tempKey] return task, tempKey //if add a new task and runtime now task runtime // stop now timer and again func (scheduler *TaskScheduler) doAndReset(key int) { scheduler.Lock() defer scheduler.UnLock() //null pointer if key len(scheduler.tasks) { nowTask := scheduler.tasks[key] scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) if nowTask.GetSpacing() 0 { tTime := nowTask.GetRunTime() nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime) number := nowTask.GetRunNumber() if number 1 { nowTask.SetRunNumber(number - 1) scheduler.tasks = append(scheduler.tasks, nowTask) } else if nowTask.GetEndTime() = tTime { scheduler.tasks = append(scheduler.tasks, nowTask)
if task.GetUuid() == uuidStr { scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) break //lock task [] func (scheduler *TaskScheduler) Lock() { scheduler.lock = true //unlock task [] func (scheduler *TaskScheduler) UnLock() { scheduler.lock = false if len(scheduler.swap) 0 { for _, task := range scheduler.swap { scheduler.tasks = append(scheduler.tasks, task) scheduler.swap = make([]TaskInterface, 0) }

23123.html

go