6

Golang中的SingleFlight与CyclicBarrier

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzI3MzQ3NDMzNw%3D%3D&%3Bmid=2247483948&%3Bidx=1&%3Bsn=2ebf04c76f56544842b4759075d8029d
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

  SingleFlight将并发请求合并成一个请求,可用于减少下游压力;CyclicBarrier可重用栅栏并发原语,控制一组请求同时执行;

SingleFlight

  在Go中SingleFlight并不是原生提供的,而是开发组提供的扩展并发原语。它可实现多个goroutine调用通过一函数时,只让一个goroutine调用该函数,等到该goroutine调用函数返回结果时再将结果返回给其他同时调用的goroutine,从而做到了减少并发调用的次数;

  在 秒杀缓存 等场景下SingleFlight作用很明显,能够大规模的减少并发数量 避免缓存穿透系统崩溃 等。将多个并发请求 合并成一,瞬间将下游系统压力从 N减少到1

func flightDemo() {
    key := "flight"
	for i := 0; i < 5; i++ {
            log.Printf("ID: %d 请求获取缓存", i)
            go func(id int) {
		value, _ := getCache(key, id)
		log.Printf("ID :%d 获取到缓存 , key: %v,value: %v", id, key, value)
	    }(i)
	}
        time.Sleep(20 * time.Second)
}

func getCache(key string, id int) (string, error) {
	var ret, _, _ = group.Do(key, func() (ret interface{}, err error) {
		time.Sleep(2 * time.Second)//模拟获取缓存
		log.Printf("ID: %v 执行获取缓存", id)
		return id, nil
	})
	return strconv.Itoa(ret.(int)), nil
}

执行结果:

2020/12/14 14:35:13 ID: 0 请求获取缓存
2020/12/14 14:35:13 ID: 1 请求获取缓存
2020/12/14 14:35:13 ID: 2 请求获取缓存
2020/12/14 14:35:13 ID: 3 请求获取缓存
2020/12/14 14:35:13 ID: 4 请求获取缓存
2020/12/14 14:35:15 ID: 0 执行获取缓存
2020/12/14 14:35:15 ID :0 获取到缓存 , key: flight,value: 0
2020/12/14 14:35:15 ID :2 获取到缓存 , key: flight,value: 0
2020/12/14 14:35:15 ID :4 获取到缓存 , key: flight,value: 0
2020/12/14 14:35:15 ID :3 获取到缓存 , key: flight,value: 0
2020/12/14 14:35:15 ID :1 获取到缓存 , key: flight,value: 0

  这个Demo中有五个goroutine同时发起获取key为flight的缓存,由于使用了SingleFlight对象,ID为0的请求率先发起了获取缓存,其他4个goroutine并不会去执行获取缓存请求逻辑, 而是等到ID为0的请求取得到结果后直接使用该结果

SingleFlight内部使用了互斥锁Mutex与Map实现,Mutex用于提供并发时的读写保护,Map用于保存同一个key的处理请求;SingleFlight提供了如下三个方法:

Do:执行一个函数,返回函数的执行结果;

DoChan:与Do方法类似,返回的是一个chan,函数fn执行完成产生结果后,可从chan中接受到函数的执行结果;

Forget:丢弃某个key,之后这个key请求会继续执行函数fn,不在等待前一个请求fn函数的执行结果;

  SingleFlight的实现部分代码如下,其中call为具体的的请求、Group代表Singleflight、map[string]*call用于存储相对应的key所发起的请求;

type call struct {
   wg  sync.WaitGroup
   val interface{}
   err error
}

type Group struct {
   mu sync.Mutex       // protects m
   m  map[string]*call // lazily initialized
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	//是否已存在该key的请求
	if c, ok := g.m[key]; ok {
 		c.dups++
		g.mu.Unlock()
		c.wg.Wait()    //等待该key第一个请求完成
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true   //返回该key第一个请求的结果
	}
	c := new(call)   //第一个请求
	c.wg.Add(1)
	g.m[key] = c     //将请求加入到map中
	g.mu.Unlock()
	g.doCall(c, key, fn) //调用函数fn
	return c.val, c.err, c.dups > 0
}

CyclicBarrier

  在Go的标准库中、开发组扩展库中其实也并没有CyclicBarrier的实现,有个第三方的CyclicBarrier实现:https://github.com/marusama/cyclicbarrier, 它的逻辑为:一组goroutine彼此等待直到所有的goroutine都达到某个执行点,再往下执行。就如栅栏一样等指定数量的人到齐了,开始抬起栅栏放行;它的执行逻辑与Java的cyclicbarrier类似;

  在Go标准库中有个对象有类似的功能:WaitGroup,但该对象并没有CyclicBarrier那么简单易用;

func cyclicBarrierDemo(){
	for i := 0; i < 3; i++ {
		go func(id int) {
            log.Printf("start: %v", id)
		barrier.Await(context.Background())
			log.Printf("finish: %v", id)		
                }(i)
        }

	time.Sleep(5 * time.Second)
	log.Printf("完成")
}

执行结果:

2020/12/14 15:11:57 start: 2
2020/12/14 15:11:57 start: 0
2020/12/14 15:11:57 start: 1
2020/12/14 15:11:57 finish: 1
2020/12/14 15:11:57 finish: 2
2020/12/14 15:11:57 finish: 0
2020/12/14 15:12:02 完成

  通过上面Demo可以看到ID为2、0的goroutine输出start后并没有继续往下执行,而是等到ID为0的goroutine执行到start后三个goroutine一起往下执行;

  如没有使用栅栏,则这个Demo的执行结果如下:

2020/12/14 15:09:02 start: 0
2020/12/14 15:09:02 finish: 0
2020/12/14 15:09:02 start: 1
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:02 start: 2
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:07 完成

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK