一、下载
1 go get github.com/robfig/cron/v3
二、使用
2.1 创建和添加任务
首先创建一个 cron.Cron 对象,用于管理定时任务。
1 2 3 4 5 c := cron.New() c := cron.New(cron.WithSeconds())
支持的选项有:
WithSeconds:让时间格式支持秒
WithLocation:指定时区
WithParser:使用自定义的解析器
WithLogger:自定义Logger
WithChain:Job 包装器
然后通过 AddFunc 方法向其添加任务函数和执行的时间,这里的执行时间格式比较灵活,在下面详细介绍。
1 2 3 4 5 6 7 func F () { fmt.Println("exec func F" ) } c.AddFunc("30 * * * *" , F)
启动定时执行。
2.2 时间格式
执行时间的格式比较灵活,有几种格式。
crontab 格式
类似于 Linux 中的 crontab 命令的格式,最细粒度为分钟,使用 5 个域来表示,用逗号分隔。
分钟:取值范围 0 - 59
小时:取值范围 0 - 23
天:取值范围 1 - 31
月:取值范围 1 - 12 或 JAN - DEC(不区分大小写)
周几:取值范围 0 - 6 或 SUN - SAT(不区分大小写)
每个字段有灵活的表示方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 * 表示任何值 特定数字表示固定的数字 1 表示固定为 1 / 表示步长 */10 表示所有 10 的倍数的值 5/10 表示从 5 开始每次加 10 的所有值 5-30/10 表示从 5 开始且小于等于 30 的每次加 10 的所有值 - 表示范围 0-11 表示从 0 到 11 的所有值 , 表示列举多个值或范围的并集 1,2,4,7 表示枚举的各个值 0-3,*/10 表示多个值或范围的并集 ? 等同于 *,只能用于天和周几
下面列举几个例子:
1 2 3 4 5 6 7 30 * * * * 每小时的 30 分 0 0-3,10,23 * * * 每天 0-3 点、10 点、23 点的 0 分 0 1 * * * 每天 01:00 */10 12-15 1 * * 每个月 1 号的 12 到 15 点的每 10 分钟
如果创建定时器时带有选项 cron.WithSeconds,则格式应该是 6 段,第一段表示秒。
预定义时间规则
这些是 cron 预定义的一些时间规则:
1 2 3 4 5 @yearly 或 @annually 每年第一天的 0 点 @monthly 每月第一天的 0 点 @weekly 每周第一天(周日)的 0 点 @daily 或 @midnight 每天的 0 点 @hourly 每小时的 0 分
固定时间间隔
从定时器开始运行起,每隔固定时间执行。
1 2 3 4 @every 1s 每秒 @every 2m 每 2 分钟 @every 1h 每小时 @every 1m30s 每 1 分钟 30 秒
三、源码分析
3.1 Cron
cron.Cron
结构体表示定时执行器。所有的任务和调度器保存在 entries
成员内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type Cron struct { entries []*Entry chain Chain stop chan struct {} add chan *Entry remove chan EntryID snapshot chan chan []Entry running bool logger Logger runningMu sync.Mutex location *time.Location parser ScheduleParser nextID EntryID jobWaiter sync.WaitGroup }
创建对象。创建使用了选项模式,可以自由地在参数中追加配置,并在函数中依次应用配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func New (opts ...Option) *Cron { c := &Cron{ entries: nil , chain: NewChain(), add: make (chan *Entry), stop: make (chan struct {}), snapshot: make (chan chan []Entry), remove: make (chan EntryID), running: false , runningMu: sync.Mutex{}, logger: DefaultLogger, location: time.Local, parser: standardParser, } for _, opt := range opts { opt(c) } return c }
添加任务。如果定时器已经在执行,则任务先添加到 add 通道,在定时器循环中获取通道信息再向 entries 切片中加入任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (c *Cron) AddFunc(spec string , cmd func () ) (EntryID, error ) { return c.AddJob(spec, FuncJob(cmd)) }func (c *Cron) AddJob(spec string , cmd Job) (EntryID, error ) { schedule, err := c.parser.Parse(spec) if err != nil { return 0 , err } return c.Schedule(schedule, cmd), nil }func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { c.runningMu.Lock() defer c.runningMu.Unlock() c.nextID++ entry := &Entry{ ID: c.nextID, Schedule: schedule, WrappedJob: c.chain.Then(cmd), Job: cmd, } if !c.running { c.entries = append (c.entries, entry) } else { c.add <- entry } return entry.ID }
启动定时执行。在一个 select 中去等待下次任务执行的定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 func (c *Cron) Run() { c.runningMu.Lock() if c.running { c.runningMu.Unlock() return } c.running = true c.runningMu.Unlock() c.run() }func (c *Cron) run() { c.logger.Info("start" ) now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) c.logger.Info("schedule" , "now" , now, "entry" , entry.ID, "next" , entry.Next) } for { sort.Sort(byTime(c.entries)) var timer *time.Timer if len (c.entries) == 0 || c.entries[0 ].Next.IsZero() { timer = time.NewTimer(100000 * time.Hour) } else { timer = time.NewTimer(c.entries[0 ].Next.Sub(now)) } for { select { case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake" , "now" , now) for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) c.logger.Info("run" , "now" , now, "entry" , e.ID, "next" , e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append (c.entries, newEntry) c.logger.Info("added" , "now" , now, "entry" , newEntry.ID, "next" , newEntry.Next) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue case <-c.stop: timer.Stop() c.logger.Info("stop" ) return case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed" , "entry" , id) } break } } }func (c *Cron) startJob(j Job) { c.jobWaiter.Add(1 ) go func () { defer c.jobWaiter.Done() j.Run() }() }
停止定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (c *Cron) Stop() context.Context { c.runningMu.Lock() defer c.runningMu.Unlock() if c.running { c.stop <- struct {}{} c.running = false } ctx, cancel := context.WithCancel(context.Background()) go func () { c.jobWaiter.Wait() cancel() }() return ctx }
3.2 Entry
Entry 结构体包含调度器和要执行的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type Entry struct { ID EntryID Schedule Schedule Next time.Time Prev time.Time WrappedJob Job Job Job }type EntryID int type FuncJob func () func (f FuncJob) Run() { f() }
3.3 Chain
Chain 结构体使用装饰器模式,将要执行的函数用其它自定义的行为包含起来,在执行的时候依次执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type JobWrapper func (Job) Jobtype Chain struct { wrappers []JobWrapper }func NewChain (c ...JobWrapper) Chain { return Chain{c} }func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len (c.wrappers)-i-1 ](j) } return j }
3.4 Parser
Parser 是解析定时规则的解析器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type Parser struct { options ParseOption }type ParseOption int const ( Second ParseOption = 1 << iota SecondOptional Minute Hour Dom Month Dow DowOptional Descriptor )var standardParser = NewParser( Minute | Hour | Dom | Month | Dow | Descriptor, )func WithSeconds () Option { return WithParser(NewParser( Second | Minute | Hour | Dom | Month | Dow | Descriptor, )) }
解析时间格式并返回 Schedule 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (p Parser) Parse(spec string ) (Schedule, error ) { if strings.HasPrefix(spec, "@" ) { if p.options&Descriptor == 0 { return nil , fmt.Errorf("parser does not accept descriptors: %v" , spec) } return parseDescriptor(spec, loc) } fields := strings.Fields(spec) var err error fields, err = normalizeFields(fields, p.options) if err != nil { return nil , err } field := func (field string , r bounds) uint64 { if err != nil { return 0 } var bits uint64 bits, err = getField(field, r) return bits } var ( second = field(fields[0 ], seconds) minute = field(fields[1 ], minutes) hour = field(fields[2 ], hours) dayofmonth = field(fields[3 ], dom) month = field(fields[4 ], months) dayofweek = field(fields[5 ], dow) ) if err != nil { return nil , err } return &SpecSchedule{ Second: second, Minute: minute, Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc, }, nil }
3.5 Schedule
Schedule 接口是调度器,只有一个方法 Next,表示任务的下次执行时间。
1 2 3 4 5 6 type Schedule interface { Next(time.Time) time.Time }
SpecSchedule 结构体实现了 Schedule 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 type SpecSchedule struct { Second, Minute, Hour, Dom, Month, Dow uint64 Location *time.Location }func (s *SpecSchedule) Next(t time.Time) time.Time { origLocation := t.Location() loc := s.Location if loc == time.Local { loc = t.Location() } if s.Location != time.Local { t = t.In(s.Location) } t = t.Add(1 *time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) added := false yearLimit := t.Year() + 5 WRAP: if t.Year() > yearLimit { return time.Time{} } for 1 <<uint (t.Month())&s.Month == 0 { if !added { added = true t = time.Date(t.Year(), t.Month(), 1 , 0 , 0 , 0 , 0 , loc) } t = t.AddDate(0 , 1 , 0 ) if t.Month() == time.January { goto WRAP } } for !dayMatches(s, t) { if !added { added = true t = time.Date(t.Year(), t.Month(), t.Day(), 0 , 0 , 0 , 0 , loc) } t = t.AddDate(0 , 0 , 1 ) if t.Hour() != 0 { if t.Hour() > 12 { t = t.Add(time.Duration(24 -t.Hour()) * time.Hour) } else { t = t.Add(time.Duration(-t.Hour()) * time.Hour) } } if t.Day() == 1 { goto WRAP } } for 1 <<uint (t.Hour())&s.Hour == 0 { if !added { added = true t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0 , 0 , 0 , loc) } t = t.Add(1 * time.Hour) if t.Hour() == 0 { goto WRAP } } for 1 <<uint (t.Minute())&s.Minute == 0 { if !added { added = true t = t.Truncate(time.Minute) } t = t.Add(1 * time.Minute) if t.Minute() == 0 { goto WRAP } } for 1 <<uint (t.Second())&s.Second == 0 { if !added { added = true t = t.Truncate(time.Second) } t = t.Add(1 * time.Second) if t.Second() == 0 { goto WRAP } } return t.In(origLocation) }
四、实践
只起一个实例,然后用锁来控制这单实例的任务冲突问题(保证一时间只有一个cron job在进行)
1 2 3 4 type exclusiveLock uint32 func (el *exclusiveLock) acquire() bool { return atomic.CompareAndSwapUint32((*uint32 )(el), 0 , 1 ) }func (el *exclusiveLock) release() { atomic.StoreUint32((*uint32 )(el), 0 ) }
摘抄:cron:Go定时任务的使用与源码分析