生产者消费者模型
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "fmt" "time" )const ( consumerCount = 10 )func producer (ch chan <- string , something string ) { ch <- something }func consumers (ch <-chan string ) { for i := 0 ; i < consumerCount; i++ { go func (i int ) { for something := range ch { fmt.Println(i, something) } }(i) } }func main () { ch := make (chan string , 10 ) consumers(ch) for { producer(ch, "Hello, World!" ) time.Sleep(time.Second * 1 ) } }
实践 - woker pool
我的封装:提前创建好集合几个worker (goroutine),然后等待channel中的数据
https://github.com/gw-gong/gwkit-go/tree/master/concurrency/work_pool
参数怎么设置? 工作池的三个核心参数(channelSize
、workerPoolSize
、timeoutSubmit
)的设置需要结合任务特性 (CPU 密集 / IO 密集)、系统资源 (CPU 核心数、内存)和业务需求 (延迟容忍度、峰值压力)综合判断。以下是通用设置原则和典型场景建议:
workerPoolSize
(worker 数量,并发处理任务的 goroutine 数)
作用 :控制同时执行任务的并发数,避免无限制创建 goroutine 导致的资源耗尽。
设置原则 :
核心依据是任务类型 (CPU 密集型 vs IO 密集型)和CPU 核心数 。
CPU 密集型任务 (如数据计算、加密解密):
任务主要消耗 CPU 资源,过多的并发会导致频繁的上下文切换,反而降低效率。
建议值:workerPoolSize = CPU核心数 ± 1
(例如 8 核 CPU 设置为 8 或 9)。
原理:充分利用 CPU 核心,避免切换开销。
IO 密集型任务 (如数据库查询、网络请求、文件 IO):
任务大部分时间在等待(IO 阻塞),CPU 利用率低,可通过增加并发数提高吞吐量。
建议值:workerPoolSize = CPU核心数 × 5 ~ 10
(例如 8 核 CPU 设置为 40~80)。
原理:利用等待时间并行处理更多任务,提升整体效率。
极端场景 :
如果任务有严格的 QPS 限制(如调用第三方 API 有频率限制),workerPoolSize
需小于等于限制值(例如 API 限频 100QPS,worker 数不超过 100)。
channelSize
(任务队列缓冲区大小)
作用 :缓冲等待处理的任务,避免任务提交时因 worker 繁忙而立即阻塞,吸收短时间的任务提交峰值。
设置原则 :
核心依据是任务提交频率波动 和worker 处理速度 。
建议值:channelSize = workerPoolSize × 2 ~ 5
(例如 worker 数为 20 时,队列大小设为 40~100)。
理由:
过小(如小于 worker 数):队列容易满,导致提交频繁超时,失去缓冲意义。
过大(如远大于 worker 数 ×10):会占用过多内存(每个任务都有内存开销),且任务积压过多时,处理延迟会显著增加。
特殊场景:
若任务提交峰值极高(如秒杀场景短时间内涌入大量任务),可适当调大(如workerPoolSize × 10
),避免瞬间提交失败。
若任务内存占用大(如大文件处理),需减小channelSize
,避免 OOM。
timeoutSubmit
(任务提交超时时间)
作用 :控制任务提交到队列的最大等待时间,避免提交方因队列满而无限阻塞。
设置原则 :
核心依据是业务对延迟的容忍度 和队列处理速度 。
建议值:500ms ~ 10s
(根据业务场景调整)。
理由:
短超时(如 500ms~2s):适合对实时性要求高的场景(如 Web 请求处理),避免用户等待过久。
长超时(如 5s~10s):适合后台任务(如日志处理、数据同步),允许稍等队列空闲后再提交,减少任务丢失。
关联调整:
若channelSize
较大(缓冲能力强),timeoutSubmit
可适当缩短(队列满的概率低,无需等太久)。
若workerPoolSize
较小(处理慢),timeoutSubmit
可适当延长(给队列更多时间消化任务)。
总结:典型场景配置示例
场景
任务类型
workerPoolSize(8 核 CPU)
channelSize
timeoutSubmit
Web 服务实时处理
IO 密集(DB 查询)
40~60
80~120(2×worker)
1~2s
后台数据计算
CPU 密集
8~10
20~40(4×worker)
5~10s
高峰值任务(如秒杀)
IO 密集(订单处理)
80~100
800~1000(10×worker)
3~5s
轻量定时任务
混合类型
10~20
30~60(3×worker)
2~3s
调优建议
监控调整 :通过QueueLength()
监控队列积压情况,若频繁满队列(导致提交超时),可增加channelSize
或workerPoolSize
。
压力测试 :在上线前用压测工具模拟峰值流量,观察任务处理延迟和提交成功率,逐步调整参数。
动态适配 :对于超大规模系统,可实现动态调整workerPoolSize
(如根据 CPU 利用率扩缩容),但复杂度较高,中小型系统固定参数即可。
核心原则:“够用就好” ,避免过度配置(如盲目调大 worker 数或队列大小)导致资源浪费。
简单使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" "github.com/gw-gong/gwkit-go/concurrency/work_pool" )type someWork struct { workId int workName string }func (w *someWork) Do() { fmt.Println(w.workId, w.workName) }func main () { workPool, err := work_pool.NewWorkerPool(32 , 16 ) if err != nil { panic (err) } for i := 0 ; ; i++ { if err := workPool.Submit(&someWork{workId: i, workName: fmt.Sprintf("work%d" , i)}); err != nil { fmt.Println(err) return } time.Sleep(time.Millisecond * 500 ) } }
实践 - ants work pool
https://github.com/panjf2000/ants
文档:https://pkg.go.dev/github.com/panjf2000/ants/v2
1 go get github.com/panjf2000/ants/v2
1 2 ants.WithDisablePurge(false ), ants.WithExpiryDuration(time.Second*10 ),
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package mainimport ( "context" "fmt" "math/rand" "time" "github.com/gw-gong/gwkit-go/concurrency/ants_tools" "github.com/gw-gong/gwkit-go/log" common_utils "github.com/gw-gong/gwkit-go/utils/common" "github.com/panjf2000/ants/v2" )func main () { syncFn, err := log.InitGlobalLogger(log.NewDefaultLoggerConfig()) defer syncFn() common_utils.ExitOnErr(context.Background(), err) multiPool, err := ants.NewMultiPool(10 , 10 , ants.RoundRobin, ants.WithDisablePurge(false ), ants.WithExpiryDuration(time.Second*5 ), ants.WithPanicHandler(common_utils.DefaultPanicHandler), ants.WithLogger(&ants_tools.SugarLogger{Ctx: context.Background()}), ants.WithMaxBlockingTasks(100 ), ) if err != nil { common_utils.ExitOnErr(context.Background(), err) } defer multiPool.ReleaseTimeout(time.Second * 10 ) go func () { for { log.Info("runtime info" , log.Int("multiPool.Running()" , multiPool.Running()), log.Int("multiPool.Free()" , multiPool.Free()), log.Int("multiPool.Waiting()" , multiPool.Waiting()), log.Int("multiPool.Cap()" , multiPool.Cap()), ) time.Sleep(time.Second * 1 ) } }() ticker := time.NewTicker(time.Second * 20 ) for stop := false ; !stop; { multiPool.Submit(func () { if rand.Intn(100 ) < 10 { panic ("test panic" ) } fmt.Println("Hello, World!" ) time.Sleep(time.Second * 3 ) }) select { case <-ticker.C: log.Info("stop submit" ) stop = true default : time.Sleep(time.Second * 1 ) } } select {} }
发布订阅模型
一、基本概念
发布订阅(Publish-Subscribe,简称 Pub/Sub)是一种消息传递模式 ,核心思想是通过 “中间层” 解耦消息的生产者(发布者)和消费者(订阅者),实现两者的 “无直接依赖通信”。
发布者(Publisher):发送消息的一方,无需知道谁会接收消息,仅需将消息发送到指定 “主题”。
订阅者(Subscriber):接收消息的一方,无需知道谁发送消息,仅需订阅感兴趣的 “主题”,即可接收该主题下的消息。
核心特点:发布者和订阅者完全解耦,彼此不知道对方的存在,通过 “主题” 关联。
二、核心组件
发布者(Publisher)
消息的生产者,负责向 “主题” 发送消息,不关心消息的接收者。
订阅者(Subscriber)
消息的消费者,通过订阅 “主题” 获取消息,只关注自己订阅的主题,不关心消息的发布者。
主题(Topic)
消息的分类标签(类似 “频道”),是发布者和订阅者的关联媒介。发布者向特定主题发送消息,订阅者通过订阅主题接收消息。
消息代理(Broker)
中间件角色,负责管理主题、接收发布者的消息、存储消息(可选)、并将消息转发给所有订阅该主题的订阅者。
(简单实现中可省略 broker,直接通过主题关联,但复杂场景必须依赖 broker 保证可靠性)
三、工作流程
订阅者向 broker 订阅感兴趣的主题(如 “用户注册事件”“订单支付通知”)。
发布者向 broker 发布消息,并指定消息所属的主题。
broker 接收消息后,检查该主题的所有订阅者,将消息转发给每个订阅者。
订阅者接收并处理消息。
四、与其他模式的对比
模式
核心差异
适用场景
观察者模式
无中间 broker,观察者直接依赖被观察者(内存内交互,紧耦合)
单进程内的事件通知(如 UI 组件)
点对点(P2P)
消息通过 “队列” 传递,一条消息仅被一个消费者处理(负载均衡场景)
任务分发、订单处理
Pub/Sub
消息通过 “主题” 传递,一条消息被所有订阅者接收(广播场景),有 broker 解耦
多接收方的事件通知、日志分发
五、应用场景
实时通知 :如聊天消息(群聊中所有人接收)、APP 推送(订阅 “系统通知” 主题的用户都收到)。
日志 / 监控 :多个服务订阅 “系统日志” 主题,分别处理日志分析、告警等。
事件驱动架构 :微服务间通过事件通信(如 “订单创建” 事件触发库存扣减、物流创建等)。
实时数据同步 :如股票行情、实时数据仪表盘(所有订阅者实时获取更新)。
六、优缺点
优点
解耦性强 :发布者和订阅者完全隔离(时间解耦:无需同时在线;空间解耦:无需知道对方地址;数量解耦:增减双方不影响彼此)。
可扩展性好 :新增发布者或订阅者无需修改现有系统,只需关联主题即可。
灵活性高 :支持一对多通信(广播),满足多接收方场景。
缺点
消息可靠性风险 :若 broker 无持久化,订阅者离线可能丢失消息(需通过持久化、重试机制弥补)。
延迟可能增加 :消息需经过 broker 转发,相比直接调用可能增加延迟。
复杂度提升 :引入 broker 后需维护中间件,增加系统部署和运维成本。
七、Golang 简单实现示例(基于 channel)
利用 Go 的 goroutine 和 channel 可快速实现一个简易 Pub/Sub 模型(无持久化,适合单进程内使用):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 package mainimport ( "fmt" "sync" )type Topic struct { mu sync.RWMutex subscribers map [string ]chan string }type Broker struct { mu sync.RWMutex topics map [string ]*Topic }func NewBroker () *Broker { return &Broker{ topics: make (map [string ]*Topic), } }func (b *Broker) getTopic(name string ) *Topic { b.mu.RLock() topic, ok := b.topics[name] b.mu.RUnlock() if !ok { b.mu.Lock() defer b.mu.Unlock() if topic, ok = b.topics[name]; !ok { topic = &Topic{ subscribers: make (map [string ]chan string ), } b.topics[name] = topic } } return topic }func (b *Broker) Subscribe(topicName, subscriberID string ) chan string { topic := b.getTopic(topicName) ch := make (chan string , 10 ) topic.mu.Lock() defer topic.mu.Unlock() topic.subscribers[subscriberID] = ch return ch }func (b *Broker) Unsubscribe(topicName, subscriberID string ) { b.mu.RLock() topic, ok := b.topics[topicName] b.mu.RUnlock() if !ok { return } topic.mu.Lock() defer topic.mu.Unlock() delete (topic.subscribers, subscriberID) }func (b *Broker) Publish(topicName, message string ) { b.mu.RLock() topic, ok := b.topics[topicName] b.mu.RUnlock() if !ok { return } topic.mu.RLock() defer topic.mu.RUnlock() for _, ch := range topic.subscribers { select { case ch <- message: default : fmt.Printf("消息 [%s] 发送给订阅者失败(channel 满)\n" , message) } } }func main () { broker := NewBroker() sub1Ch := broker.Subscribe("news" , "sub1" ) go func () { for msg := range sub1Ch { fmt.Println("sub1 收到消息:" , msg) } }() sub2Ch := broker.Subscribe("news" , "sub2" ) go func () { for msg := range sub2Ch { fmt.Println("sub2 收到消息:" , msg) } }() broker.Publish("news" , "Go 1.21 发布了!" ) broker.Publish("news" , "Pub/Sub 模型真好用~" ) broker.Unsubscribe("news" , "sub1" ) broker.Publish("news" , "sub1 已取消订阅,看不到这条消息" ) fmt.Scanln() }
八、扩展思考
生产环境中,需使用成熟中间件(如 Redis Pub/Sub、RabbitMQ、Kafka),支持消息持久化、重试、分区等高级特性。
需考虑订阅者离线后的消息补偿(如 Kafka 的 offset 机制)。
可通过 “主题层级”(如 news.sports
、news.tech
)实现更灵活的订阅策略。
最快响应优先模型
也可以称为 “Select 多路复用模型”
在 Golang 中,这种 “多 goroutine 并发执行同类操作,取第一个返回结果” 的模式非常常见,通常被称为 “最快响应优先” 模式。这种模式特别适合处理对响应时间敏感、且操作具有幂等性(重复执行无副作用)的场景,比如多源数据获取、服务健康检查等。
核心原理
通过启动多个 goroutine 同时执行相同任务,利用 channel 收集结果,主程序只需等待第一个返回的结果(无论成功或失败),即可终止等待并使用该结果。这样能最大程度减少 “木桶效应”—— 不必等待最慢的那个任务完成,从而提升整体响应速度。
实现示例
以下是一个简单示例:模拟从 3 个不同数据源获取同一用户信息,取第一个返回的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package mainimport ( "fmt" "math/rand" "time" )func fetchUserInfo (source string , userID int ) (string , error ) { delay := time.Duration(rand.Intn(500 )) * time.Millisecond time.Sleep(delay) if rand.Float32() < 0.2 { return "" , fmt.Errorf("source %s failed" , source) } return fmt.Sprintf("user %d info from %s" , userID, source), nil }func getFastestUserInfo (userID int ) (string , error ) { resultCh := make (chan string ) errCh := make (chan error ) sources := []string {"sourceA" , "sourceB" , "sourceC" } for _, src := range sources { go func (source string ) { info, err := fetchUserInfo(source, userID) if err != nil { errCh <- err return } resultCh <- info }(src) } errCount := 0 totalSources := len (sources) for { select { case info := <-resultCh: return info, nil case err := <-errCh: errCount++ if errCount == totalSources { return "" , fmt.Errorf("all %d sources failed, last error: %v" , totalSources, err) } } } }func main () { rand.Seed(time.Now().UnixNano()) info, err := getFastestUserInfo(123 ) if err != nil { fmt.Println("Error:" , err) return } fmt.Println("Got result:" , info) }
关键技术点
channel 通信 :用 channel 收集各个 goroutine 的结果(或错误),主程序通过 channel 获取第一个响应。
select 语句 :通过select
监听多个 channel,优先处理第一个就绪的 channel(即第一个返回结果的 goroutine)。(ps: 只要有未阻塞的case, 就一定不会执行default)
goroutine 管理 :虽然示例中没有显式终止未完成的 goroutine,但由于它们完成后会自动退出,且结果会被 channel 缓冲(不会阻塞),因此不会导致 goroutine 泄漏。
错误处理 :需要考虑部分 goroutine 失败的情况,示例中通过多层 select 确保 “只要有一个成功就返回”,全失败才返回错误。
注意事项
幂等性要求 :操作必须是幂等的(重复执行不会产生副作用),比如读取操作。写入操作慎用,可能导致数据不一致。
资源控制 :避免启动过多 goroutine 导致资源耗尽,可根据实际需求限制并发数。
超时机制 :建议添加全局超时,防止所有 goroutine 都卡住(例如在select
中增加case <-time.After(timeout)
)。
结果有效性 :如果对结果准确性有要求,需确保所有 goroutine 返回的结果是一致的(或可接受的)。
这种模式在微服务架构中尤为实用,比如多实例部署的服务调用、多 CDN 节点资源拉取等场景,能有效降低整体响应时间。
控制异步任务场景下的最大goroutine
在大流量函数中,异步启动 goroutine 执行任务时非常有必要控制 goroutine 的数量 。原因和控制方法如下:
为什么需要控制 goroutine 数量?
尽管 goroutine 是轻量级的(初始栈大小仅 2KB,可动态扩展),但在大流量场景下无限制创建会带来严重问题:
资源耗尽风险 :每个 goroutine 会占用内存(栈空间、调度元数据等),大流量下瞬间创建数万甚至数十万 goroutine 可能导致内存暴增,触发 OOM(内存溢出)。
调度开销激增 :Go 调度器需要在大量 goroutine 间切换,当数量超过 CPU 核心数过多时,上下文切换成本会显著增加,反而降低执行效率。
下游资源过载 :如果 goroutine 的任务涉及访问数据库、调用 API 等,无限制的并发会压垮下游服务(如数据库连接池耗尽、接口限流被触发)。
如何控制 goroutine 数量?
最常用的方案是使用带缓冲的 channel 作为信号量 ,限制并发执行的 goroutine 数量。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package mainimport ( "log" "sync" )func main () { const maxGoroutines = 100 sem := make (chan struct {}, maxGoroutines) var wg sync.WaitGroup for i := 0 ; i < 1000 ; i++ { sem <- struct {}{} wg.Add(1 ) go func (taskID int ) { defer func () { <-sem wg.Done() }() log.Printf("处理任务 %d" , taskID) }(i) } wg.Wait() }
关键注意点
合理设置并发数 :maxGoroutines
的值需根据任务类型调整。若任务是 CPU 密集型,建议接近 CPU 核心数;若为 IO 密集型(如网络请求),可适当增大(但需匹配下游资源承载能力)。
结合业务监控 :通过 metrics 记录 goroutine 数量、等待队列长度等,动态调整并发限制(如结合配置中心实现热更新)。
避免长时间阻塞 :若 goroutine 内部可能长时间阻塞(如无限期等待锁),需配合超时控制,防止信号量被长期占用导致新任务无法执行。
综上,大流量场景下必须控制 goroutine 数量,这是保障系统稳定性的关键措施。
任务编排
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "fmt" "sync" "time" )func doTask (taskName string , duration time.Duration, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("任务 %s 开始执行\n" , taskName) time.Sleep(duration) fmt.Printf("任务 %s 执行完成\n" , taskName) }func main () { var wg sync.WaitGroup tasks := []struct { name string duration time.Duration }{ {"下载文件A" , 1 * time.Second}, {"下载文件B" , 2 * time.Second}, {"下载文件C" , 1500 * time.Millisecond}, } wg.Add(len (tasks)) for _, task := range tasks { go doTask(task.name, task.duration, &wg) } wg.Wait() fmt.Println("所有任务已全部完成,开始汇总结果" ) }
Pipeline(流水线)模型
将复杂任务拆分为多个阶段,每个阶段由一个或多个 goroutine 处理,阶段之间通过 channel 传递数据,形成 “流水线” 式的并发处理流程。
特点:每个阶段专注于处理特定逻辑,通过 channel 串联,提高整体处理效率,适合数据流式处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package mainimport ( "fmt" "sync" )func generate (nums []int ) <-chan int { out := make (chan int ) go func () { defer close (out) for _, n := range nums { out <- n } }() return out }func filter (in <-chan int ) <-chan int { out := make (chan int ) go func () { defer close (out) for n := range in { if n%2 != 0 { out <- n } } }() return out }func transform (in <-chan int ) <-chan int { out := make (chan int ) go func () { defer close (out) for n := range in { out <- n * 2 } }() return out }func printResults (in <-chan int , wg *sync.WaitGroup) { defer wg.Done() for res := range in { fmt.Printf("最终结果: %d\n" , res) } }func main () { nums := []int {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 } genCh := generate(nums) filterCh := filter(genCh) transformCh := transform(filterCh) var wg sync.WaitGroup wg.Add(1 ) go printResults(transformCh, &wg) wg.Wait() fmt.Println("流水线处理完成" ) }
参考:https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-06-goroutine.html