Errgroup 的应用场景

总结摘要
errgroup 主要解决两类场景的问题: 快速失败模式:任一任务出错时,立即终止所有任务并返回错误(适合任务强依赖场景)。 全量执行模式:所有任务无论成功与否都执行完毕,最终汇总所有错误(适合任务独立场景)。

Go 并发任务管理:errgroup 两种核心模式实战指南

一、背景与现状描述

在 Go 语言并发编程中,我们经常需要处理「一组并行任务」,并面临两个核心问题:

  1. 如何等待所有任务完成
  2. 如何处理任务执行中出现的错误

现有方案的局限

  • sync.WaitGroup:仅能等待任务完成,无法传递错误,需额外通过 channel 或共享变量收集错误,代码繁琐。
  • 手动管理 goroutine + channel:可实现错误传递,但需手动处理 goroutine 生命周期、错误聚合、退出信号等,易出现漏处理(如 goroutine 泄漏)。
  • errgroup 的价值:作为官方扩展库(golang.org/x/sync/errgroup)提供的工具,它在 WaitGroup 基础上封装了错误传递和上下文管理能力,大幅简化并发任务的错误处理逻辑。

二、errgroup 核心能力概述

errgroup 主要解决两类场景的问题:

  1. 快速失败模式:任一任务出错时,立即终止所有任务并返回错误(适合任务强依赖场景)。
  2. 全量执行模式:所有任务无论成功与否都执行完毕,最终汇总所有错误(适合任务独立场景, 例如检测节点健康状态)。

其核心结构体为 errgroup.Group,提供两个关键方法:

  • Go(f func() error):提交一个并发任务(函数返回错误)。
  • Wait() error:等待所有任务完成,返回第一个非空错误(或 nil)。

三、场景一:快速失败模式(使用 errgroup.WithContext

3.1 场景定义

当一组任务中,任一任务失败会导致其他任务失去执行意义时(如分布式事务、多步骤依赖的接口调用),需要“一错全停”,避免资源浪费。

3.2 实现原理

通过 errgroup.WithContext(context.Background()) 创建带上下文的 Group

  • 当任一任务返回错误时,Group 会自动取消关联的上下文(ctx)。
  • 其他任务可通过监听 ctx.Done() 提前感知取消信号,终止执行。

3.3 实战案例:分布式配置加载

3.3.1 需求

从 3 个依赖的配置中心(A、B、C)并发加载配置,任一加载失败则整体失败,避免使用不完整的配置。

3.3.2 代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"sync"
	"time"

	"golang.org/x/sync/errgroup"
)

// Config 配置数据结构
type Config struct {
	Source string // 配置来源(A/B/C)
	Data   map[string]string
}

// 从配置中心加载数据(模拟可能失败)
func loadConfig(ctx context.Context, source string, delay time.Duration) (Config, error) {
	// 模拟网络请求,同时监听上下文取消
	select {
	case <-ctx.Done():
		return Config{}, fmt.Errorf("配置中心 %s 加载被终止:%w", source, ctx.Err())
	case <-time.After(delay):
		// 模拟随机失败(仅配置中心B有30%概率失败)
		if source == "B" && time.Now().UnixNano()%10 < 3 {
			return Config{}, errors.New("连接超时")
		}
		return Config{
			Source: source,
			Data:   map[string]string{"timeout": "30s", "maxConn": "100"},
		}, nil
	}
}

func main() {
	// 1. 创建带上下文的errgroup(快速失败核心)
	g, ctx := errgroup.WithContext(context.Background())

	// 2. 并发安全存储配置结果
	var (
		configs []Config
		mu      sync.Mutex
	)

	// 3. 定义任务列表(3个配置中心)
	sources := []struct {
		name  string
		delay time.Duration
	}{
		{"A", 100 * time.Millisecond},
		{"B", 150 * time.Millisecond}, // 可能失败的节点
		{"C", 200 * time.Millisecond},
	}

	// 4. 提交任务
	for _, s := range sources {
		source := s // 捕获循环变量,避免闭包陷阱
		g.Go(func() error {
			cfg, err := loadConfig(ctx, source.name, source.delay)
			if err != nil {
				return fmt.Errorf("配置中心 %s 加载失败:%w", source.name, err)
			}
			// 安全添加结果
			mu.Lock()
			configs = append(configs, cfg)
			mu.Unlock()
			return nil
		})
	}

	// 5. 等待所有任务完成并处理结果
	if err := g.Wait(); err != nil {
		log.Fatalf("配置加载失败:%v", err)
	}

	// 6. 所有配置加载成功
	fmt.Println("所有配置加载成功:")
	for _, cfg := range configs {
		fmt.Printf("来源 %s: %+v\n", cfg.Source, cfg.Data)
	}
}

3.3.3 执行结果分析

  • 失败场景:若配置中心 B 失败,ctx 被取消,配置中心 C 会因 <-ctx.Done() 提前终止,g.Wait() 返回 B 的错误。
  • 成功场景:所有配置中心加载完成,汇总结果并输出。

四、场景二:全量执行模式(不使用 errgroup.WithContext

4.1 场景定义

当任务之间相互独立,即使部分任务失败,仍需执行所有任务并汇总所有错误时(如批量数据校验、多节点健康检查),需要“全量执行,集中报错”。

4.2 实现原理

直接创建 errgroup.Group(不关联上下文):

  • 任务失败不会触发其他任务终止,所有任务都会执行完毕。
  • 通过自定义错误收集器(如封装锁和切片的结构体)收集所有错误。

4.3 实战案例:多节点健康检查

4.3.1 需求

检查 5 个服务节点的健康状态,无论部分节点是否异常,都需等待所有检查完成,最终汇总异常节点信息。

4.3.2 代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
package main

import (
	"errors"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/errgroup"
)

// 1. 错误收集器:并发安全的错误管理
type ErrorCollector struct {
	mu   sync.Mutex
	errs []error
}

func NewErrorCollector() *ErrorCollector {
	return &ErrorCollector{}
}

// 添加错误(忽略空错误)
func (c *ErrorCollector) Add(err error) {
	if err == nil {
		return
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	c.errs = append(c.errs, err)
}

// 获取所有错误(返回副本,避免外部修改)
func (c *ErrorCollector) AllErrors() []error {
	c.mu.Lock()
	defer c.mu.Unlock()
	copied := make([]error, len(c.errs))
	copy(copied, c.errs)
	return copied
}

// 2. 节点健康检查任务
type NodeCheckTask struct {
	NodeName string        // 节点名称
	Timeout  time.Duration // 检查超时时间
}

// 执行健康检查(模拟可能失败)
func (t *NodeCheckTask) Check() error {
	time.Sleep(t.Timeout) // 模拟检查耗时
	// 模拟随机失败(20%概率)
	if time.Now().UnixNano()%10 < 2 {
		return errors.New("心跳无响应")
	}
	fmt.Printf("节点 %s 健康检查通过\n", t.NodeName)
	return nil
}

func main() {
	// 1. 初始化组件
	errCollector := NewErrorCollector()
	var g errgroup.Group

	// 2. 定义检查任务列表
	nodes := []NodeCheckTask{
		{"node-1", 100 * time.Millisecond},
		{"node-2", 150 * time.Millisecond},
		{"node-3", 200 * time.Millisecond},
		{"node-4", 180 * time.Millisecond},
		{"node-5", 220 * time.Millisecond},
	}

	// 3. 提交所有检查任务
	for _, node := range nodes {
		task := node // 捕获循环变量
		g.Go(func() error {
			if err := task.Check(); err != nil {
				// 包装错误并添加到收集器
				wrappedErr := fmt.Errorf("节点 %s 异常:%w", task.NodeName, err)
				errCollector.Add(wrappedErr)
				return wrappedErr // 不影响其他任务
			}
			return nil
		})
	}

	// 4. 等待所有任务完成(忽略g.Wait()返回的第一个错误)
	_ = g.Wait()

	// 5. 汇总结果
	if len(errCollector.AllErrors()) == 0 {
		fmt.Println("\n所有节点健康检查通过")
		return
	}

	// 输出所有异常
	fmt.Printf("\n共 %d 个节点异常:\n", len(errCollector.AllErrors()))
	for _, err := range errCollector.AllErrors() {
		fmt.Printf("- %v\n", err)
	}
}

4.3.3 执行结果分析

  • 无论多少节点失败,所有 5 个节点的检查都会执行完毕。
  • 最终输出所有异常节点的错误信息,便于运维人员批量处理。

五、两种模式对比与最佳实践

维度快速失败模式(WithContext全量执行模式(无 WithContext
核心目标一错全停,减少无效执行全量执行,汇总所有错误
上下文依赖依赖 errgroup 生成的 ctx无(或自定义上下文)
错误处理只返回第一个错误收集所有错误
适用场景任务强依赖(如分布式事务)任务独立(如批量检查)
代码关键差异使用 g, ctx := errgroup.WithContext(...)直接声明 var g errgroup.Group

最佳实践

  1. 错误包装:始终使用 fmt.Errorf("%w", err) 包装错误,保留错误链和上下文(如任务名称、来源)。
  2. 并发安全:共享资源(如结果切片)必须通过 sync.Mutex 或 channel 保证安全,避免数据竞争。
  3. 闭包陷阱:循环提交任务时,需通过 task := t 捕获当前变量副本,防止所有 goroutine 引用同一变量。
  4. 上下文监听:快速失败模式中,任务函数需通过 select <-ctx.Done() 监听取消信号,及时退出。

六、总结

errgroup 作为 Go 并发编程的高效工具,通过两种核心模式解决了不同场景下的任务管理问题:

  • 快速失败模式适合任务强依赖场景,通过上下文取消机制实现“一错全停”,减少资源浪费。
  • 全量执行模式适合任务独立场景,通过自定义错误收集器实现“全量执行,集中报错”,满足批量处理需求。