Worker 池并发任务处理器
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
func (t Task) Do() {
fmt.Printf("任务 %d 正在执行 by goroutine %d\n", t.ID, getGID())
time.Sleep(500 * time.Millisecond)
}
// 获取当前 goroutine ID(调试用,不建议生产用)
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
var id uint64
fmt.Sscanf(string(b), "goroutine %d ", &id)
return id
}
// 启动一个 worker
func worker(id int, jobs <-chan Task, wg *sync.WaitGroup) {
for task := range jobs {
fmt.Printf("Worker %d 拿到任务 %d\n", id, task.ID)
task.Do()
wg.Done()
}
}
func main() {
numWorkers := 3
numTasks := 10
jobs := make(chan Task, numTasks)
var wg sync.WaitGroup
// 启动 worker 池
for i := 1; i <= numWorkers; i++ {
go worker(i, jobs, &wg)
}
// 分发任务
for i := 1; i <= numTasks; i++ {
wg.Add(1)
jobs <- Task{ID: i}
}
close(jobs) // 所有任务已投递完毕,关闭 channel
wg.Wait() // 等待所有任务完成
fmt.Println("所有任务已完成 ✅")
}
封装进阶
workerPool.go
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
type Task interface {
Run() error
MaxRetries() int
}
// WorkerPool 支持任务重试的通用结构
type WorkerPool struct {
workerCount int
jobs chan Task
wg sync.WaitGroup
}
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan Task, 100),
}
}
// 提交任务
func (wp *WorkerPool) Submit(task Task) {
wp.wg.Add(1)
wp.jobs <- task
}
// 启动 worker 池
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
go wp.worker(i)
}
}
// 等待所有任务完成
func (wp *WorkerPool) Wait() {
wp.wg.Wait()
close(wp.jobs)
}
func (wp *WorkerPool) worker(id int) {
for task := range wp.jobs {
retries := 0
max := task.MaxRetries()
for {
err := task.Run()
if err == nil {
break
}
retries++
fmt.Printf("Worker %d: 任务失败,重试 %d/%d,错误:%v\n", id, retries, max, err)
if retries > max {
fmt.Printf("Worker %d: 任务最终失败放弃\n", id)
break
}
time.Sleep(300 * time.Millisecond)
}
wp.wg.Done()
}
}
main.go
package main
type MyTask struct {
ID int
RetryLimit int
}
func (t MyTask) Run() error {
if rand.Float32() < 0.5 {
return errors.New("模拟失败")
}
fmt.Printf("任务 %d 成功 ✅\n", t.ID)
return nil
}
func (t MyTask) MaxRetries() int {
return t.RetryLimit
}
func main() {
rand.Seed(time.Now().UnixNano())
pool := NewWorkerPool(3)
pool.Start()
for i := 1; i <= 10; i++ {
task := MyTask{ID: i, RetryLimit: 2}
pool.Submit(task)
}
pool.Wait()
fmt.Println("所有任务处理完毕 ✅")
}
函数执行器 超时控制 + 重试机制
控制函数执行时间,超时自动终止
失败时自动重试,最多重试 N 次
可配置每次重试之间的间隔,每次间隔时间翻倍(指数退避)+随机延迟,避免同时重试(Equal Jitter) sleep = base × 2^(n-1) + rand(0, base × 2^(n-1))
可控制总超时时间或手动取消
retryable.go
package retry
import (
"context"
"errors"
"math"
"time"
)
type RetryableFunc func(ctx context.Context) error
type RetryStats struct {
RetryCount int
TotalDuration time.Duration
Success bool
LastError error
}
type RetryConfig struct {
MaxRetries int // 最大重试次数
Timeout time.Duration
BaseInterval time.Duration // 基础间隔
MaxInterval time.Duration // 最大间隔
OnComplete func(RetryStats) // ✅ 回调钩子
}
func Do(ctx context.Context, fn RetryableFunc, cfg RetryConfig) error {
start := time.Now()
var lastErr error
retryCount := 0
for i := 0; i <= cfg.MaxRetries; i++ {
select {
case <-ctx.Done():
return errors.New("任务被外部取消:" + ctx.Err().Error())
default:
}
// 创建单次执行的超时控制
attemptCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
errChan := make(chan error, 1)
go func() {
errChan <- fn(attemptCtx)
}()
select {
case <-attemptCtx.Done():
lastErr = errors.New("执行超时:" + attemptCtx.Err().Error())
case err := <-errChan:
if err == nil {
cancel()
if cfg.OnComplete != nil {
cfg.OnComplete(RetryStats{
RetryCount: retryCount,
TotalDuration: time.Since(start),
Success: true
LastError: nil,
})
}
return nil // 成功
}
retryCount++
lastErr = err
}
cancel()
// 不再重试就直接退出
if i == cfg.MaxRetries {
break
}
// 指数退避 sleep
sleepDuration := cfg.BaseInterval * time.Duration(math.Pow(2, float64(i)))
jitteredBackoff(cfg.BaseInterval, i, cfg.MaxInterval)
if sleepDuration > cfg.MaxInterval {
sleepDuration = cfg.MaxInterval
}
select {
case <-ctx.Done():
return errors.New("任务在重试等待中被取消")
case <-time.After(sleepDuration):
// 继续重试
}
}
if cfg.OnComplete != nil {
cfg.OnComplete(RetryStats{
RetryCount: retryCount,
TotalDuration: time.Since(start),
Success: lastErr == nil,
LastError: lastErr,
})
}
return lastErr
}
func jitteredBackoff(base time.Duration, attempt int, max time.Duration) time.Duration {
exp := float64(uint(1) << attempt) // 2^attempt
maxBackoff := time.Duration(float64(base) * exp)
if maxBackoff > max {
maxBackoff = max
}
half := maxBackoff / 2
jitter := time.Duration(rand.Int63n(int64(half)))
return half + jitter
}
main.go
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"yourpkg/retry"
)
func main() {
rand.Seed(time.Now().UnixNano())
// 设置一个总控制超时的外部 context(或手动 cancel)
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()
err := retry.Do(ctx, func(ctx context.Context) error {
fmt.Println("执行任务中...")
time.Sleep(300 * time.Millisecond)
if rand.Float32() < 0.8 {
return errors.New("随机失败")
}
fmt.Println("任务成功!")
return nil
}, retry.RetryConfig{
MaxRetries: 5,
Timeout: 1 * time.Second,
BaseInterval: 500 * time.Millisecond,
MaxInterval: 4 * time.Second,
OnComplete: func(stats retry.RetryStats) {
fmt.Printf("📊 Retry Report:\n")
fmt.Printf("重试次数: %d\n", stats.RetryCount)
fmt.Printf("总耗时: %v\n", stats.TotalDuration)
fmt.Printf("成功状态: %v\n", stats.Success)
if stats.LastError != nil {
fmt.Printf("最终错误: %v\n", stats.LastError)
}
},
})
if err != nil {
fmt.Println("最终失败:", err)
} else {
fmt.Println("任务成功完成 ✅")
}
}