7

Goroutine并发编程注意事项

 3 years ago
source link: https://driverzhang.github.io/post/goroutine%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

goroutine并发编程注意事项

总结在实际编写代码中,使用 goroutine 协程时的注意要点和一些编写规范。

在实践业务代码中,我们会经常看见启用 goroutine 方式: go func() {“这都是你业务代码,代码中没出现任何管控方式!”},然后就完事了,这里绝对不推荐这种方式,因为没有满足下面的三个条件:

1. 并发业务内容应该让调用者来开启go func(){} 或 go func_xxx() 显示的声明来开启协程。(有利于直观维护性)

2. 开启goroutine 协程跑业务代码,开启者你必须清楚的知道它的生命周期。(什么时候结束,考虑会不会长时间hang住,或者考虑有没有什么异常情况你无法控制)

3. 开启协程必定要加,超时控制退出 或 控制 channel 通知退出。(防止内存泄露出现 野G)

注意:也许有人会说,不好意思,我就没有控制生命周期线上依旧正常运行啊。

这里只能说,线上跑起来正常不表示你代码就没有BUG,只不过没有满足触发你BUG的条件摆了~

goroutine 开启的集几种姿势和控制方案

1. 直接协程go 匿名函数func()方式:

简单demo如下:

// 5秒超时控制避免 野G
ctxTime, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

go func() {
	errCtx := make(chan error)
	// 调用业务函数返回 error 一个参数,当然这里你也可以返回自己定义的response struct
	errCtx <- qwc.InsertCallbackContent(ctxTime, qwCallbackService.InsertCallbackContentReq{})

	select {
	case <-ctxTime.Done():
		log.Error("InsertCallbackContent:5秒超时")
		return
	case err := <-errCtx:
		if err != nil {
			logger.Errorf("TwEventCallback插入回调记录失败:%v", err.Error())
		}
		return
	}
}()

2. 直接 go func_xx(params) 无返回结果方式:

首先这个方式与上面区别就很大了,没有返回结果,那你如何控制你的 goroutine 的生命周期呢?

代码demo如下:

案例场景描述1:

一个最基本没有返回结果的情况下,如何进行goroutine生命周期控制,如下:

比如 程序中有 EventBus 事件上报执行对应的事件逻辑,但我们希望它是在我主业务的逻辑的旁支异步去执行,不应该阻塞的主业务浪费时间。(其实这里会演变成场景2)

func main() {
		// 这里调用一个eventbus事件
		eventbus := &App{}
		eventbus.Event("this is eventbus")

		// do ...常驻进程挂起操作
	  c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
		for {
			si := <-c
			switch si {
			case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
				return eventbus.shutdown()
			default:
				return nil
			}
		}
}

type App struct {
	wg *sync.WaitGroup
}

func(a *App) shutdown() {
	// 各种平滑退出接口调用
	// 这里去调用一个 等待所有goroutine执行完毕 
	eventbus.WaitAsync()
}

func(a *App) Event(data string) {
	a.wg.Add(1) // 这里一定要写在goroutine外部
	go func(){
		defer a.wg.Done()
		time.Sleep(xxx)
		fmt.Println(data)
	}()
}

func(a *App) WaitAsync() {
	a.wg.Wait()
}

上面这段代码,是一个功能完善但是并不完美的平滑退出goroutine,但是你得等待执行完毕。

所以它的缺陷就是,如果其中某event一直在挂起,难不成我一直等嘛?还不是要泄内存露。

所以我们来看看完善版demo:

func TestEvent(t *testing.T) {
	tx := NewApp(10)
	go tx.Run()
	_ = tx.Event(context.Background(), "test1")
	_ = tx.Event(context.Background(), "test2")
	_ = tx.Event(context.Background(), "test3")
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	// 加入超时控制,而不是死等所有任务执行完毕
	tx.shutdown(ctx)
}

func NewApp(chNum int) *App {
	return &App{
		// 设置待缓存的 channel
		// 这里 直接选择使用通道到代替 新开 goroutine 降低资源消耗
		ch: make(chan string, chNum),
	}
}

type App struct {
	ch   chan string
	stop chan struct{}
}

func (a *App) Event(ctx context.Context, data string) error {
	select {
	case a.ch <- data:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func (a *App) Run() {
	// 等待通道里的消息传入即可
	// 一旦 channel 被close后,就会跳出for range
	for data := range a.ch {
		// 执行你的注册 event 逻辑
		time.Sleep(1 * time.Second)
		fmt.Println(data)
	}
	// 执行完成后发送通道消息
	a.stop <- struct{}{}
}

func (a *App) shutdown(ctx context.Context) {
	close(a.ch)
	select {
	case <-a.stop:
	case <-ctx.Done():
	}
}

这段demo2 和上面的demo1 差别还是比较明显的。

利用 channel 代替了 goroutine,并且取消了 WaitGroup 用于的等待所有 goroutine 的运行。

代替方法就是在 Run() 函数中了。

在demo2中你得自己预估好你的超时时间具体设定的值,结合你的业务事件设置合理的超时控制时间即可。不然你就要一直等待~~



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK