go: Worker Pool Pattern

📅 2026/6/21 20:52:14
go: Worker Pool Pattern
项目结构/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:56 # User : geovindu # Product : GoLand # Project : godesginpattern # File : logger.go */ package utils import ( godesginpattern/workerpool/config io log os ) var Logger *log.Logger func InitLogger() { // 打开日志文件 file, err : os.OpenFile(config.LogFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err ! nil { log.Fatal(日志文件创建失败:, err) } // 同时输出到控制台 文件 multiWriter : io.MultiWriter(os.Stdout, file) // 初始化日志 Logger log.New(multiWriter, , config.LogFlag) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : retry.go */ package utils import ( godesginpattern/workerpool/config time ) // Retry 执行函数并自动重试 func Retry(task func() error) error { var err error for i : 0; i config.MaxRetryTimes; i { err task() if err nil { return nil } Logger.Printf(任务失败第 %d 次重试错误: %v, i1, err) time.Sleep(500 * time.Millisecond) } return err } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : monitor.go */ package utils import sync/atomic type Monitor struct { Running int64 Waiting int64 Finished int64 Failed int64 } func (m *Monitor) IncRunning() { atomic.AddInt64(m.Running, 1) } func (m *Monitor) DecRunning() { atomic.AddInt64(m.Running, -1) } func (m *Monitor) IncWaiting() { atomic.AddInt64(m.Waiting, 1) } func (m *Monitor) DecWaiting() { atomic.AddInt64(m.Waiting, -1) } func (m *Monitor) IncFinished() { atomic.AddInt64(m.Finished, 1) } func (m *Monitor) IncFailed() { atomic.AddInt64(m.Failed, 1) } func (m *Monitor) Get() (running, waiting, finished, failed int64) { return atomic.LoadInt64(m.Running), atomic.LoadInt64(m.Waiting), atomic.LoadInt64(m.Finished), atomic.LoadInt64(m.Failed) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:58 # User : geovindu # Product : GoLand # Project : godesginpattern # File : settings.go */ package config import log const ( WorkerCount 3 QueueMaxSize 100 MinTaskDelay 0.3 MaxTaskDelay 0.8 MaxRetryTimes 3 LogFileName jewelry.log DBPath jewelry.db HTTPPort :8080 MonitorInterval 2 ) const LogFlag log.LstdFlags | log.Lmicroseconds /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : task.go */ package tasks import ( godesginpattern/workerpool/config math/rand time ) func RawMaterialCheck(orderID string) error { simulateDelay() return nil } func JewelryProcess(orderID string) error { simulateDelay() return nil } func FinishedGoodsCheck(orderID string) error { simulateDelay() return nil } func InventoryRecord(orderID string) error { simulateDelay() return nil } func OrderDelivery(orderID string) error { simulateDelay() return nil } var FullProcessTasks []func(string) error{ RawMaterialCheck, JewelryProcess, FinishedGoodsCheck, InventoryRecord, OrderDelivery, } func simulateDelay() { rand.Seed(time.Now().UnixNano()) delay : config.MinTaskDelay rand.Float64()*(config.MaxTaskDelay-config.MinTaskDelay) time.Sleep(time.Duration(delay*1000) * time.Millisecond) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : worker_pool.go */ package core import ( godesginpattern/workerpool/config godesginpattern/workerpool/utils os os/signal sync syscall time ) type Task struct { OrderID string Func func(string) error } type WorkerPool struct { workerCount int taskChan chan *Task wg sync.WaitGroup monitor *utils.Monitor quit chan os.Signal } func NewWorkerPool(workerCnt int, queueSize int) *WorkerPool { wp : WorkerPool{ workerCount: workerCnt, taskChan: make(chan *Task, queueSize), monitor: utils.Monitor{}, quit: make(chan os.Signal, 1), } signal.Notify(wp.quit, syscall.SIGINT, syscall.SIGTERM) return wp } func (wp *WorkerPool) worker(id int) { utils.Logger.Printf(Worker %d 已启动, id) for { select { case task, ok : -wp.taskChan: if !ok { utils.Logger.Printf(Worker %d 安全退出, id) return } wp.monitor.DecWaiting() wp.monitor.IncRunning() utils.Logger.Printf(Worker %d 开始任务: %s, id, task.OrderID) err : utils.Retry(func() error { return task.Func(task.OrderID) }) if err ! nil { utils.Logger.Printf(Worker %d 任务失败: %s, 错误: %v, id, task.OrderID, err) } else { utils.Logger.Printf(Worker %d 完成任务: %s, id, task.OrderID) } wp.monitor.DecRunning() wp.monitor.IncFinished() wp.wg.Done() case -wp.quit: utils.Logger.Printf(Worker %d 收到关闭信号退出, id) return } } } func (wp *WorkerPool) Start() { utils.Logger.Println(工作池启动) for i : 1; i wp.workerCount; i { go wp.worker(i) } go wp.monitorLoop() } func (wp *WorkerPool) Submit(task *Task) { wp.wg.Add(1) wp.monitor.IncWaiting() wp.taskChan - task } func (wp *WorkerPool) monitorLoop() { ticker : time.NewTicker(time.Duration(config.MonitorInterval) * time.Second) defer ticker.Stop() for { select { case -ticker.C: r, w, f, failed : wp.monitor.Get() utils.Logger.Printf([监控] 运行:%d | 等待:%d | 完成:%d | 失败:%d, r, w, f, failed) // 运行、等待都为0代表所有任务全部完成 if r 0 w 0 { utils.Logger.Println(所有任务处理完成自动停止监控输出) return } case -wp.quit: utils.Logger.Println(监控循环收到关闭信号停止监控输出) return } } } func (wp *WorkerPool) Wait() { // 阻塞等待 CtrlC / kill 信号 -wp.quit utils.Logger.Println(优雅关闭中...) // 关闭任务通道worker 不再接收新任务 close(wp.taskChan) // 等待正在执行的任务全部处理完毕 wp.wg.Wait() utils.Logger.Println(✅ 所有任务执行完毕服务安全退出) }调用/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:02 # User : geovindu # Product : GoLand # Project : godesginpattern # File : workerpoolbll.go */ package bll import ( fmt godesginpattern/workerpool/config godesginpattern/workerpool/core godesginpattern/workerpool/tasks godesginpattern/workerpool/utils ) func WorkerPoolMain() { utils.InitLogger() logger : utils.Logger logger.Println() logger.Println( 珠宝企业级生产系统Go 企业级 Worker Pool) logger.Println() pool : core.NewWorkerPool(config.WorkerCount, config.QueueMaxSize) pool.Start() totalOrder : 10 logger.Printf(开始提交 %d 个珠宝订单\n, totalOrder) for i : 1; i totalOrder; i { orderID : fmt.Sprintf(订单-%03d, i) for _, fn : range tasks.FullProcessTasks { pool.Submit(core.Task{ OrderID: orderID, Func: fn, }) } } logger.Println(✅ 所有订单已提交按 CtrlC 优雅关闭) pool.Wait() }输出