1

golang 里面的超时取消

 2 years ago
source link: https://feiyang233.club/post/context/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

之前读书 《Go 语言并发之道》 里面有讲到 context 的超时取消,最近在工作上也遇到了一个问题和超时取消有关系的。

公众号那篇文章的有一句话,菜鸡深有同感:我记得我第一次接触context时,同事都说这个用来做并发控制的,可以设置超时时间,超时就会取消往下执行,快速返回,我就单纯的认为只要函数中带着context参数往下传递就可以做到超时取消,快速返回。
我曾经也是这样认为的,哈哈哈。

kafka write 超时

今年从 devops 转 backend 后端开发,遇到一个 kafka write 超时的问题。

err = w.writer.Save(castedEvent)

这里调用 kafka team 的 client 写入, 但是 client 的 timeout 是 30 s , 重试 3 次,再加上 broker 比较多,等待的时间就会更久。
Net.[Dial|Read]Timeout = 30s
Metadata.Retry.Max=3
Metadata.Retry.Backoff=250ms
for example: a producer with 1 broker Addr and 6 partitions (intotal 6+1 brokers considered in the library) the processing time is abt 7-8 min for the first msg to return error msg
Note:

  • when there are more brokers and partitions, the processing time of the message will be multiplied
  • if the timeout of the rest api is shorter than the producer processing time, there might not be any success/error msg returned and displayed on datadog

交代完问题的背景,我就开始思考如何才能提前取消,而不是等待 七八分钟。

single gorutine

因为工作上那段代码逻辑只有一个 gorutine, 我就尽量想用一个简化版的例子实现超时取消。
《Go 语言并发之道》 关于 context 超时取消给我印象最深刻的一句话: goroutine 中任何阻塞操作都必须是可抢占的 ,以便它可以被取消。
我就在思考,kafka 在写入的时候,在哪一步可以被抢占呢? 我猜测应该是尝试写之后,如果失败了,就 backoff 在重试,这一段贤者时间,可以被抢占。

package main

import (
"context"
"log"
"math/rand"
"time"
)

func kafkaWrite() bool {
i := rand.Intn(2)
log.Println(i)
if i == 0 {
return true
} else {
return false
}
}

func tryWrite(ctx context.Context) error {
for {
result := kafkaWrite()
if result {
return nil
}
select {
case <-ctx.Done():
log.Println("timeout exit")
return ctx.Err()

default:
time.Sleep(1 * time.Second)
}
}
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := tryWrite(ctx)
if err != nil {
log.Println(err)
} else {
log.Println("Success to process in time")
}

}

这里我们 granular 很细了,也用到了 context。 但是实际上我们在调用 kafka SDK 的时候,并不能把 context 传进去。

multiple gorutine

在咨询大佬 jiahao 后,他给出了另外一种方法,多用一个 channel,监听 write 是否完成。而且也不需要传 context 到 kafka write 代码里面。

package main

import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"runtime"
"time"
)

func kafkaWrite() error {
i := rand.Intn(2)
log.Println(i)
if i == 0 {
return nil
} else {
for i := 0; i < 10; i++ {
log.Println("sleeping", i)
time.Sleep(1 * time.Second)
}
return errors.New("write error")
}

}

func tryWrite(ctx context.Context) error {
jobDone := make(chan struct{})
go func() {
err := kafkaWrite()
if err != nil {
log.Println("failed to write")
} else {
//jobDone <- struct{}{}
close(jobDone)
}
}()

select {
case <-ctx.Done():
log.Println("timeout exit")
return ctx.Err()
case <-jobDone:
log.Println("job Done")
return nil
}

}

func main() {
go func() {
for i := 0; i < 15; i++ {
fmt.Printf("gorutine number is: %v\n", runtime.NumGoroutine())
time.Sleep(1 * time.Second)
}
}()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := tryWrite(ctx)
if err != nil {
log.Println(err)
} else {
log.Println("Success to process in time")
}

time.Sleep(10 * time.Second)
}

这个确实能提前取消,但那个 gorutine 会一直运行,直到 timeout 才消亡。 我们在这里专门用一个 gorutine 去监控当前 gorutine 数量 runtime.NumGoroutine

gorutine number is: 2
2022/05/17 20:41:15 1
2022/05/17 20:41:15 sleeping 0
gorutine number is: 3
2022/05/17 20:41:16 sleeping 1
2022/05/17 20:41:17 sleeping 2
gorutine number is: 3
2022/05/17 20:41:18 timeout exit
2022/05/17 20:41:18 context deadline exceeded
gorutine number is: 3
2022/05/17 20:41:18 sleeping 3
gorutine number is: 3
2022/05/17 20:41:19 sleeping 4
gorutine number is: 3
2022/05/17 20:41:20 sleeping 5
gorutine number is: 3
2022/05/17 20:41:21 sleeping 6
gorutine number is: 3
2022/05/17 20:41:22 sleeping 7
gorutine number is: 3
2022/05/17 20:41:23 sleeping 8
2022/05/17 20:41:24 sleeping 9
gorutine number is: 3
2022/05/17 20:41:25 failed to write
gorutine number is: 2
gorutine number is: 2
gorutine number is: 2

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK