Go语言使用定时器实现任务队列
2023-06-13 09:12:01 时间
Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。
Go语言中定时器
remove: make(chan string), Logger: log.New(os.Stdout, [Control]: , log.Ldate|log.Ltime|log.Lshortfile), lock: false, //add spacing time job to list with number func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) { task := getTaskWithFuncSpacingNumber(spaceTime, number, f) scheduler.addTask(task) //add spacing time job to list with endTime func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) { task := getTaskWithFuncSpacing(spaceTime, endTime, f) scheduler.addTask(task) //add func to list func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) { task := getTaskWithFunc(unixTime, f) scheduler.addTask(task) func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { scheduler.addTask(task) //add a task to list func (scheduler *TaskScheduler) AddTask(task *Task) string { if task.RunTime != 0 { if task.RunTime 100000000000 { task.RunTime = task.RunTime * int64(time.Second) if task.RunTime time.Now().UnixNano() { //延遲1秒 task.RunTime = time.Now().UnixNano() + int64(time.Second) } else { if task.Spacing 0 { task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second) }else{ scheduler.Logger.Println( error too add task! Runtime error ) return if task.Uuid == { task.Uuid = uuid.New().String() return scheduler.addTask(task) //if lock add to swap func (scheduler *TaskScheduler) addTask(task TaskInterface) string { if scheduler.lock { scheduler.swap = append(scheduler.swap, task) scheduler.add - task } else{ scheduler.tasks = append(scheduler.tasks, task) scheduler.add - task return task.GetUuid() //new export func (scheduler *TaskScheduler) ExportInterface() []TaskInterface { return scheduler.tasks //compatible old export tasks func (scheduler *TaskScheduler) Export() []*Task { task := make([]*Task,0) for _,v := range scheduler.tasks { task = append(task, v.(*Task)) return task //stop task with uuid func (scheduler *TaskScheduler) StopOnce(uuidStr string) { scheduler.remove - uuidStr //run Cron func (scheduler *TaskScheduler) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() { log.Println( It s a Hour timer! ) scheduler.tasks = append(scheduler.tasks, task) go scheduler.run() //stop all func (scheduler *TaskScheduler) Stop() { scheduler.stop - struct{}{} //run task list //if is empty, run a year timer task func (scheduler *TaskScheduler) run() { for { now := time.Now() task, key := scheduler.GetTask() runTime := task.GetRunTime() i64 := runTime - now.UnixNano() var d time.Duration if i64 0 { scheduler.tasks[key].SetRuntime(now.UnixNano()) if task != nil { go task.RunJob() scheduler.doAndReset(key) continue } else { sec := runTime / int64(time.Second) nsec := runTime % int64(time.Second) d = time.Unix(sec, nsec).Sub(now) timer := time.NewTimer(d) //catch a chan and do something for { select { //if time has expired do task and shift key if is task list case -timer.C: scheduler.doAndReset(key) if task != nil { //fmt.Println(scheduler.tasks[key]) go task.RunJob() timer.Stop() //if add task case -scheduler.add: timer.Stop() // remove task with remove uuid case uuidstr := -scheduler.remove: scheduler.removeTask(uuidstr) timer.Stop() //if get a stop single exit case -scheduler.stop: timer.Stop() return break //return a task and key In task list func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) { scheduler.Lock() defer scheduler.UnLock() min := scheduler.tasks[0].GetRunTime() tempKey = 0 for key, task := range scheduler.tasks { tTime := task.GetRunTime() if min = tTime { continue if min tTime { tempKey = key min = tTime continue task = scheduler.tasks[tempKey] return task, tempKey //if add a new task and runtime now task runtime // stop now timer and again func (scheduler *TaskScheduler) doAndReset(key int) { scheduler.Lock() defer scheduler.UnLock() //null pointer if key len(scheduler.tasks) { nowTask := scheduler.tasks[key] scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) if nowTask.GetSpacing() 0 { tTime := nowTask.GetRunTime() nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime) number := nowTask.GetRunNumber() if number 1 { nowTask.SetRunNumber(number - 1) scheduler.tasks = append(scheduler.tasks, nowTask) } else if nowTask.GetEndTime() = tTime { scheduler.tasks = append(scheduler.tasks, nowTask)
if task.GetUuid() == uuidStr { scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) break //lock task [] func (scheduler *TaskScheduler) Lock() { scheduler.lock = true //unlock task [] func (scheduler *TaskScheduler) UnLock() { scheduler.lock = false if len(scheduler.swap) 0 { for _, task := range scheduler.swap { scheduler.tasks = append(scheduler.tasks, task) scheduler.swap = make([]TaskInterface, 0) }
一般用法:
package main import( fmt time func main() { input := make(chan interface{}) //producer - produce the messages go func() { for i := 0; i i++ { input - i input - hello, world t1 := time.NewTimer(time.Second * 5) t2 := time.NewTimer(time.Second * 10) for { select { //consumer - consume the messages case msg := -input: fmt.Println(msg) case -t1.C: println( 5s timer ) t1.Reset(time.Second * 5) case -t2.C: println( 10s timer ) t2.Reset(time.Second * 10) }
上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:
type Timer struct {
C -chan Time
r runtimeTimer
}
原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 - 或者 - 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。
设计我们的定时任务队列当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。
具体的流程如下图所示:
定义结构
type OnceCron struct { tasks []*Task //任务的列队 add chan *Task //当遭遇到新任务的时候 remove chan string //当遭遇到删除任务的时候 stop chan struct{} //当遇到停止信号的时候 Logger *log.Logger //日志 type Job interface { Run() //执行接口 type Task struct { Job Job //要执行的任务 Uuid string //任务标识,删除时用 RunTime int64 //执行时间 Spacing int64 //间隔时间 EndTime int64 //结束时间 Number int //总共要次数 }
首先,我们要获得一个队列任务
func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。
然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
func (one *OnceCron) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() { log.Println( It s a Hour timer! ) }) //为了代码格式 markdown 里面有个括号改成全角了 one.tasks = append(one.tasks, task) go one.run() //协成执行 防止主进程被阻塞 }
执行部分应该是重点的,分成三部:
remove: make(chan string), Logger: log.New(os.Stdout, [Control]: , log.Ldate|log.Ltime|log.Lshortfile), lock: false, //add spacing time job to list with number func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) { task := getTaskWithFuncSpacingNumber(spaceTime, number, f) scheduler.addTask(task) //add spacing time job to list with endTime func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) { task := getTaskWithFuncSpacing(spaceTime, endTime, f) scheduler.addTask(task) //add func to list func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) { task := getTaskWithFunc(unixTime, f) scheduler.addTask(task) func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { scheduler.addTask(task) //add a task to list func (scheduler *TaskScheduler) AddTask(task *Task) string { if task.RunTime != 0 { if task.RunTime 100000000000 { task.RunTime = task.RunTime * int64(time.Second) if task.RunTime time.Now().UnixNano() { //延遲1秒 task.RunTime = time.Now().UnixNano() + int64(time.Second) } else { if task.Spacing 0 { task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second) }else{ scheduler.Logger.Println( error too add task! Runtime error ) return if task.Uuid == { task.Uuid = uuid.New().String() return scheduler.addTask(task) //if lock add to swap func (scheduler *TaskScheduler) addTask(task TaskInterface) string { if scheduler.lock { scheduler.swap = append(scheduler.swap, task) scheduler.add - task } else{ scheduler.tasks = append(scheduler.tasks, task) scheduler.add - task return task.GetUuid() //new export func (scheduler *TaskScheduler) ExportInterface() []TaskInterface { return scheduler.tasks //compatible old export tasks func (scheduler *TaskScheduler) Export() []*Task { task := make([]*Task,0) for _,v := range scheduler.tasks { task = append(task, v.(*Task)) return task //stop task with uuid func (scheduler *TaskScheduler) StopOnce(uuidStr string) { scheduler.remove - uuidStr //run Cron func (scheduler *TaskScheduler) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() { log.Println( It s a Hour timer! ) scheduler.tasks = append(scheduler.tasks, task) go scheduler.run() //stop all func (scheduler *TaskScheduler) Stop() { scheduler.stop - struct{}{} //run task list //if is empty, run a year timer task func (scheduler *TaskScheduler) run() { for { now := time.Now() task, key := scheduler.GetTask() runTime := task.GetRunTime() i64 := runTime - now.UnixNano() var d time.Duration if i64 0 { scheduler.tasks[key].SetRuntime(now.UnixNano()) if task != nil { go task.RunJob() scheduler.doAndReset(key) continue } else { sec := runTime / int64(time.Second) nsec := runTime % int64(time.Second) d = time.Unix(sec, nsec).Sub(now) timer := time.NewTimer(d) //catch a chan and do something for { select { //if time has expired do task and shift key if is task list case -timer.C: scheduler.doAndReset(key) if task != nil { //fmt.Println(scheduler.tasks[key]) go task.RunJob() timer.Stop() //if add task case -scheduler.add: timer.Stop() // remove task with remove uuid case uuidstr := -scheduler.remove: scheduler.removeTask(uuidstr) timer.Stop() //if get a stop single exit case -scheduler.stop: timer.Stop() return break //return a task and key In task list func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) { scheduler.Lock() defer scheduler.UnLock() min := scheduler.tasks[0].GetRunTime() tempKey = 0 for key, task := range scheduler.tasks { tTime := task.GetRunTime() if min = tTime { continue if min tTime { tempKey = key min = tTime continue task = scheduler.tasks[tempKey] return task, tempKey //if add a new task and runtime now task runtime // stop now timer and again func (scheduler *TaskScheduler) doAndReset(key int) { scheduler.Lock() defer scheduler.UnLock() //null pointer if key len(scheduler.tasks) { nowTask := scheduler.tasks[key] scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) if nowTask.GetSpacing() 0 { tTime := nowTask.GetRunTime() nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime) number := nowTask.GetRunNumber() if number 1 { nowTask.SetRunNumber(number - 1) scheduler.tasks = append(scheduler.tasks, nowTask) } else if nowTask.GetEndTime() = tTime { scheduler.tasks = append(scheduler.tasks, nowTask)
if task.GetUuid() == uuidStr { scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) break //lock task [] func (scheduler *TaskScheduler) Lock() { scheduler.lock = true //unlock task [] func (scheduler *TaskScheduler) UnLock() { scheduler.lock = false if len(scheduler.swap) 0 { for _, task := range scheduler.swap { scheduler.tasks = append(scheduler.tasks, task) scheduler.swap = make([]TaskInterface, 0) }
23123.html
go相关文章
- go 语言版本控制器
- 「Go工具箱」go语言csrf库的使用方式和实现原理
- go 并发模式
- 「Go工具箱」将文件大小转换成Kb、Mb、Gb就用这个库:go-humanize
- GO语言开篇-Go语言急速入门(基础知识点)| 青训营笔记
- Go-包管理-go get(二)
- go pprof命令(Go语言性能分析命令)完全攻略
- Go-连接Redis-学习go-redis包详解编程语言
- 程序Linux上运行Go语言程序的指南(linux运行go)
- 简易教程:如何在 Linux 上安装 GO 语言(go语言安装linux)
- 以Go语言操作MySQL:轻松运行数据库各种查询(go语言mysql)
- 用Go语言深入Redis缓存技术(用go写redis)
- Go语言快速安装Oracle数据库(go语言安装oracle)
- 数据库探索Go语言与Oracle数据库的结合(go语言中的oracle)
- 程序Go语言调用Oracle数据库驱动程序指南(go oracle驱动)
- 语句使用Oracle中的Go语句来简化编程(oracle中go)