chiachan
chiachan
Published on 2025-01-22 / 6 Visits
0

go语言学习笔记 - channal实战

#go

Worker 池并发任务处理器

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID int
}

func (t Task) Do() {
    fmt.Printf("任务 %d 正在执行 by goroutine %d\n", t.ID, getGID())
    time.Sleep(500 * time.Millisecond)
}

// 获取当前 goroutine ID(调试用,不建议生产用)
func getGID() uint64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    var id uint64
    fmt.Sscanf(string(b), "goroutine %d ", &id)
    return id
}

// 启动一个 worker
func worker(id int, jobs <-chan Task, wg *sync.WaitGroup) {
    for task := range jobs {
        fmt.Printf("Worker %d 拿到任务 %d\n", id, task.ID)
        task.Do()
        wg.Done()
    }
}

func main() {
    numWorkers := 3
    numTasks := 10

    jobs := make(chan Task, numTasks)
    var wg sync.WaitGroup

    // 启动 worker 池
    for i := 1; i <= numWorkers; i++ {
        go worker(i, jobs, &wg)
    }

    // 分发任务
    for i := 1; i <= numTasks; i++ {
        wg.Add(1)
        jobs <- Task{ID: i}
    }

    close(jobs) // 所有任务已投递完毕,关闭 channel
    wg.Wait()   // 等待所有任务完成
    fmt.Println("所有任务已完成 ✅")
}

封装进阶

workerPool.go

package main

import (
    "errors"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Task interface {
    Run() error
    MaxRetries() int
}

// WorkerPool 支持任务重试的通用结构
type WorkerPool struct {
    workerCount int
    jobs        chan Task
    wg          sync.WaitGroup
}

func NewWorkerPool(workerCount int) *WorkerPool {
    return &WorkerPool{
        workerCount: workerCount,
        jobs:        make(chan Task, 100),
    }
}

// 提交任务
func (wp *WorkerPool) Submit(task Task) {
    wp.wg.Add(1)
    wp.jobs <- task
}

// 启动 worker 池
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workerCount; i++ {
        go wp.worker(i)
    }
}

// 等待所有任务完成
func (wp *WorkerPool) Wait() {
    wp.wg.Wait()
    close(wp.jobs)
}

func (wp *WorkerPool) worker(id int) {
    for task := range wp.jobs {
        retries := 0
        max := task.MaxRetries()

        for {
            err := task.Run()
            if err == nil {
                break
            }
            retries++
            fmt.Printf("Worker %d: 任务失败,重试 %d/%d,错误:%v\n", id, retries, max, err)
            if retries > max {
                fmt.Printf("Worker %d: 任务最终失败放弃\n", id)
                break
            }
            time.Sleep(300 * time.Millisecond)
        }

        wp.wg.Done()
    }
}

main.go

package main

type MyTask struct {
    ID         int
    RetryLimit int
}

func (t MyTask) Run() error {
    if rand.Float32() < 0.5 {
        return errors.New("模拟失败")
    }
    fmt.Printf("任务 %d 成功 ✅\n", t.ID)
    return nil
}

func (t MyTask) MaxRetries() int {
    return t.RetryLimit
}

func main() {
    rand.Seed(time.Now().UnixNano())

    pool := NewWorkerPool(3)
    pool.Start()

    for i := 1; i <= 10; i++ {
        task := MyTask{ID: i, RetryLimit: 2}
        pool.Submit(task)
    }

    pool.Wait()
    fmt.Println("所有任务处理完毕 ✅")
}

函数执行器 超时控制 + 重试机制

控制函数执行时间,超时自动终止
失败时自动重试,最多重试 N 次
可配置每次重试之间的间隔,每次间隔时间翻倍(指数退避)+​随机延迟​,避免同时重试(Equal Jitter) sleep = base × 2^(n-1) + rand(0, base × 2^(n-1))
可控制总超时时间或手动取消

retryable.go

package retry

import (
    "context"
    "errors"
    "math"
    "time"
)

type RetryableFunc func(ctx context.Context) error

type RetryStats struct {
    RetryCount    int
    TotalDuration time.Duration
    Success       bool
    LastError     error
}

type RetryConfig struct {
    MaxRetries   int // 最大重试次数
    Timeout      time.Duration
    BaseInterval time.Duration // 基础间隔
    MaxInterval  time.Duration // 最大间隔
    OnComplete    func(RetryStats) // ✅ 回调钩子
}

func Do(ctx context.Context, fn RetryableFunc, cfg RetryConfig) error {
    start := time.Now()
    var lastErr error
    retryCount := 0
    
    for i := 0; i <= cfg.MaxRetries; i++ {
        select {
        case <-ctx.Done():
            return errors.New("任务被外部取消:" + ctx.Err().Error())
        default:
        }

        // 创建单次执行的超时控制
        attemptCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
        errChan := make(chan error, 1)

        go func() {
            errChan <- fn(attemptCtx)
        }()

        select {
        case <-attemptCtx.Done():
            lastErr = errors.New("执行超时:" + attemptCtx.Err().Error())
        case err := <-errChan:
            if err == nil {
                cancel()
                if cfg.OnComplete != nil {
                    cfg.OnComplete(RetryStats{
                        RetryCount:    retryCount,
                        TotalDuration: time.Since(start),
                        Success:       true
                        LastError:     nil,
                    })
                }
                return nil // 成功
            }
            retryCount++
            lastErr = err
        }

        cancel()

        // 不再重试就直接退出
        if i == cfg.MaxRetries {
            break
        }

        // 指数退避 sleep
        sleepDuration := cfg.BaseInterval * time.Duration(math.Pow(2, float64(i)))
        jitteredBackoff(cfg.BaseInterval, i, cfg.MaxInterval)
        if sleepDuration > cfg.MaxInterval {
            sleepDuration = cfg.MaxInterval
        }

        select {
        case <-ctx.Done():
            return errors.New("任务在重试等待中被取消")
        case <-time.After(sleepDuration):
            // 继续重试
        }
    }
    
    if cfg.OnComplete != nil {
        cfg.OnComplete(RetryStats{
            RetryCount:    retryCount,
            TotalDuration: time.Since(start),
            Success:       lastErr == nil,
            LastError:     lastErr,
        })
    }
    return lastErr
}

func jitteredBackoff(base time.Duration, attempt int, max time.Duration) time.Duration {
    exp := float64(uint(1) << attempt) // 2^attempt
    maxBackoff := time.Duration(float64(base) * exp)
    if maxBackoff > max {
        maxBackoff = max
    }

    half := maxBackoff / 2
    jitter := time.Duration(rand.Int63n(int64(half)))
    return half + jitter
}

main.go

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "time"

    "yourpkg/retry"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    // 设置一个总控制超时的外部 context(或手动 cancel)
    ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
    defer cancel()

    err := retry.Do(ctx, func(ctx context.Context) error {
        fmt.Println("执行任务中...")
        time.Sleep(300 * time.Millisecond)

        if rand.Float32() < 0.8 {
            return errors.New("随机失败")
        }
        fmt.Println("任务成功!")
        return nil
    }, retry.RetryConfig{
        MaxRetries:   5,
        Timeout:      1 * time.Second,
        BaseInterval: 500 * time.Millisecond,
        MaxInterval:  4 * time.Second,
        OnComplete: func(stats retry.RetryStats) {
            fmt.Printf("📊 Retry Report:\n")
            fmt.Printf("重试次数: %d\n", stats.RetryCount)
            fmt.Printf("总耗时: %v\n", stats.TotalDuration)
            fmt.Printf("成功状态: %v\n", stats.Success)
            if stats.LastError != nil {
                fmt.Printf("最终错误: %v\n", stats.LastError)
            }
        },
    })

    if err != nil {
        fmt.Println("最终失败:", err)
    } else {
        fmt.Println("任务成功完成 ✅")
    }
}