1

『Go核心36讲』| 07 - go语句以及执行规则(下)

 2 years ago
source link: https://ijayer.github.io/post/tech/code/golang/tutorial-go36-07/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
  • Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?
  • Q2:怎样才能让主 goroutine 等待其他 goroutine ?
  • Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?

专栏:17 | go 语句及其执行规则(下)

@cabday Alfredsson

Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?

自己尝试用 buffer channel 实现了一个比较陋的 Demo:

/*
 * 说明:用什么手段可以对 goroutine 的启用数量加以限制?
 * 作者:zhe
 * 时间:2019-01-17 20:12 PM
 * 更新:比较陋的实现。。。
 */

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// max worker
const (
	taskNum          = 1000000
	defaultMaxWorker = 100
)

// wg waiting for all goroutine finished.
var wg = sync.WaitGroup{}

// task and worker
var w = newWorker(defaultMaxWorker)
var t = newTask(w)

func main() {
	EnableGoPool()
}

func EnableGoPool() {
	t.add(taskNum)

	go t.watching()
	go t.produce(taskNum)

	t.wait()
}

// worker
type worker struct {
	max  uint64
	pool chan struct{} // buffered chan, len(cap) <= max
}

// newWorker
// max = 0, 表明只有一个 worker
func newWorker(max uint64) *worker {
	if max < 0 {
		max = defaultMaxWorker
	}
	return &worker{
		max:  max,
		pool: make(chan struct{}, max),
	}
}

// done
func (w worker) done() {
	<-w.pool
}

// task
type task struct {
	worker *worker
}

// newTask
func newTask(w *worker) *task {
	return &task{worker: w}
}

// add
func (t *task) add(num int) {
	wg.Add(num)
}

// produce 产出任务, 任务数量不得小于 1
func (t *task) produce(num int) {
	if num == 0 {
		num = 1 // 至少安排一个任务
	}

	for i := 1; i <= num; i++ {
		// 检查 pool 是否已经被沾满,如果已满则阻塞等待
		// 空闲的 worker;当有 worker 被释放时,继续执
		// 行后面的代码块,即分配新任务给空闲的 worker
		t.worker.pool <- struct{}{}

		go t.do(i) // do sth long-running
	}
}

// wait
func (t *task) wait() {
	wg.Wait()
}

// do
func (t task) do(i int) {
	defer t.done()

	// 模拟耗时操作
	time.Sleep(time.Millisecond)
	fmt.Printf("[task][%4v][%v] done.\n", i, utils.Now())
}

// done
func (t *task) done() {
	t.worker.done()
	wg.Done()
}

func (t *task) watching() {
	for {
		<-time.After(100 * time.Millisecond)
		fmt.Printf("go: %v\n", runtime.NumGoroutine())
	}
}

Q2:怎样才能让主 goroutine 等待其他 goroutine ?

先来看 Demo:

package main

import "fmt"

func main() {
	for i:=0; i<10; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
}

这段代码运行后是不会有任何输出的,至于原因在 上一节已经分析过了, 你可以回过头在看看。现在我们来看看有那些方式可以实现让主 goroutine 等待其他 goroutine

A1:小睡一会儿

最简单粗暴的方式就是让主 goroutine “小睡” 一会儿,来改下代码看看

package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 10; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
	time.Sleep(time.Millisecond * 500) // + 小睡会儿:让主 goroutine 暂停运行,等待恢复后会继续执行后边的代码
}

这个办法是可行的,只要 “睡眠” 的时间不要太短就好。不过,问题恰恰就在这里,我们让主 goroutine “睡眠” 多长时间才合适呢? 如果睡眠太短,则很可能不足以让其他的 goroutine 运行完毕,而若 “睡眠” 太长则纯属浪费时间,这个时间就太难把握了。

既然是这样,那会不会有更好的实现方法呢? 当然是有的啊,可以让其他 goroutine 在运行完毕的时候告诉我们一下就 🆗 了啊,来在改改代码看看

A2:让其他 goroutine 在运行完毕后通知主 goroutine

这里,你是否想到了通道呢?通道的长度应该与我们手动启用的 goroutine 数量一致。在每个手动启用的 goroutine 即将运行结束的时候,我们都要向该通道发送一个值。

package main

import (
    "fmt"
    "time"
)

func main() {
    n := 10
    done := make(chan struct{}, n)
    // 这里有个细节需要注意:
    // 我们在声明通道 done 的时候是以 chan struct{} 作为类型的。其中
    // 的类型字面量 struct{} 有些类似于空接口类型 interface{}, 它代
    // 表了既不包含任何字段也不拥有任何方法的空结构体类型
    //
    // 注意:
    // struct 类型值得表示法只有一个,即:struct{}{}。并且,它占用的
    // 内存空间是 0 字节。 确切的说,这个值在整个 Go 程序中永远都只会
    // 存在一份。虽然我们可以无数次地使用这个值字面量,但是用到的却都是
    // 同一个值。


    for i:=0; i<n; i++ {
        go func() {
            fmt.Println(i)
            done <- struct{}{}
        }()
    }

    for j:0; j<n; j++ {
        <- done
    }
}

Note:当我们仅仅把通道当作传递某种简单信号的介质的时候,用 struct{} 作为其元素类型是再好不过的了。

再看这个问题,想想有没有更好的答案? 可定也是有的,如果了解 sync 代码包的话,那么可能会想到 sync.WaitGroup 类型。 来看看代码实现

A3:使用 sync.WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    n := 10

    wg := sync.WaitGroup{}
    wg.Add(n)

    for i:=0; i<n; i++ {
        go func() {
            fmt.Println(i)
            wg.Done()
        }()
    }

    wg.Wait()
}

Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?

既然是按既定的顺序执行,那么肯定是要让异步发起的 go 函数得到同步的执行

Demo 实现:

/*
 * 说明:for 循环启用 10 个 goroutine 打印迭代变量的序号,怎么保证按自然数的顺序(0,1,2...) 输出
 * 作者:zhe
 * 时间:2019-01-15 10:10 PM
 * 更新:
 */

package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

var cnt uint32

func main() {
	TestByTriggerFn()
	ImplementationByChan()
}

// ********************************************* 方式一:自旋(spinning)函数
func TestByTriggerFn() {
	InputNumPassByTrigger()
	trigger(10, func() {}) // 等待主 goroutine 结束
}

var trigger = func(i uint32, fn func()) {
	for {
		if n := atomic.LoadUint32(&cnt); n == i { // 自旋,直到 i 和 n 相等时
			// 才执行 if 里代码,n 从 0 起计数
			fn()
			atomic.AddUint32(&cnt, 1) // cnt 会在多 goroutine 间会产生竞态,所以这里采用原子操作
			break
		}
		time.Sleep(time.Nanosecond)
	}
}

func InputNumPassByTrigger() {
	for i := uint32(0); i < 10; i++ {
		go func(i uint32) {
			fn := func() { fmt.Println(i) }
			trigger(i, fn)
		}(i)
	}
}

// ********************************************* 方式二:用通道实现
func ImplementationByChan() {
	ch := make(chan int)
	go InputNum(ch)

	OutByOrder(ch)
	close(ch)
}

func InputNum(ch chan int) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			ch <- i
		}(i)
	}
}

func OutByOrder(ch chan int) {
	cnt := 0
	for {
		select {
		case i := <-ch:
			if i == cnt { // cnt 从 0 开始递增 => 从 goroutine 接收的值也从 0 递增
				fmt.Println(i)
				cnt += 1
				continue
			}
			go func() { ch <- i }() // 不符合自然数顺序的重新放回通道中去
		default:
			if cnt == 10 { // cnt 计数到 10 说明接收完成,退出函数
				return
			}
		}
	}
}

See Also

Thanks to the authors 🙂


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK