8

Go语言基础(五)—— 并发编程

 4 years ago
source link: https://studygolang.com/articles/27103
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言:

647

目录如下:

Go语言基础(一)—— 简介、环境配置、HelloWorld

Go语言基础(二)—— 基本常用语法

Go语言基础(三)—— 面向对象编程

Go语言基础(四)—— 优质的容错处理

Go语言基础(五)—— 并发编程

Go语言基础(六)—— 测试、反射、Unsafe

Go语言基础(七)—— 架构 & 常见任务

Go语言基础(八)—— 性能调优

本篇将介绍如下内容:

1.协程机制( Groutine

2.共享内存并发机制(协程安全)

3.CSP并发机制( channel

4.多路选择和超时控制( select

5.channel的关闭和广播( channel

6.任务的取消

7.Context与关联任务取消

8.常见并发任务(实战)

一、协程机制

相信大家肯定都知道 “线程”“进程” 的概念。

而在Go语言中,“协程”可以理解为更轻量级的线程。 通过调度“协程”就可以把系统Kernel的效率发挥到极致。

通过一张表格,我们来对比一下协程与线程的区别。

  • Thread vs. Groutine:
\ 默认栈大小(创建时) KSE对应关系(Kernel Space Entity) 线程 Thread 1M 1 : 1 协程 Groutine 2K M : N UnQni2Q.png!web

协程vs.线程的优势在于:

  • 线程之间的切换会牵扯到内核中系统线程( kernel entity )的切换,这会造成较大的成本。
  • 而多个协程在同一个系统线程( kernel entity )下切换,就能降低切换系统线程( kernel entity )的成本。(如上图所示)

协程的使用:

语法: go + func

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i) // 正确案例,值传递。各个协程无竞争关系。
		}(i)

		// go func() {
		// 	fmt.Println(i) // 错误案例,共享变量。各个协程有竞争关系
		// }()
	}
	time.Sleep(time.Millisecond * 50)
}
复制代码

二、共享内存并发机制(协程安全)

说到协程安全,我们第一个会想到的就是加锁(lock)。 通过加锁来保证协程安全。

在Go语言中也是如此,我们来看个例子。

  • 协程并发,导致的协程不安全:
// 协程不安全demo
func TestCounterThreadUnsafe(t *testing.T) {
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d", counter)
}
复制代码

结果如下:

=== RUN   TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
    share_mem_test.go:18: counter = 4765
复制代码

这时就会发现,计算错误,因为并发导致了漏值。

  • 解决方式一: 普通加锁,并加延迟等待协程执行完毕(不推荐)
// 协程等待demo(停1秒,不推荐)
func TestCounterThreadSafe(t *testing.T) {
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
		}()
	}
	time.Sleep(1 * time.Second) // 等待一秒,等协程全部执行完
	t.Logf("counter = %d", counter)
}
复制代码

结果如下:

=== RUN   TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
    share_mem_test.go:35: counter = 5000
复制代码

结果正确,但是有一个问题。因为这里有个1秒的延迟等待,保证协程运行完毕再调用结果。因此,有没有更好的处理方式呢?接下来我们再优化一下。

  • 解决方式二: 推荐! 使用同步等待队列( WaitGroup )保证顺序执行。
// 协程安全Demo
func TestCounterWaitGroup(t *testing.T) {
	var mut sync.Mutex    // 互斥锁
	var wg sync.WaitGroup // 等待队列
	counter := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1) // 加个任务
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
			wg.Done() // 做完任务
		}()
	}
	wg.Wait() //等待所有任务执行完毕
	t.Logf("counter = %d", counter)
}
复制代码

运行结果如下:

=== RUN   TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
    share_mem_test.go:55: counter = 5000
复制代码

这样的话,可以看出:互斥锁 Mutex 和等待队列 WaitGroup 不仅保证了协程的安全,还避免了提前打印结果。(:heavy_check_mark:)

三、CSP并发机制

1. CSP

CSP( Communicating sequential processes ):通信顺序进程(管道通信)。 简单来说,CSP是通过 Channel (管道)来通信的。

Go 中的 Channel (管道)有容量限制并且独立于处理 Groutine (协程)。

2. Channel

Go中常见的 Channel 有两种,分别对应为 ChannelBuffer Channel

  • 第一种:Channel(无缓冲)
VvYjmeR.png!web

首先,发送者与接受者必须同时站在 Channel 的两端才进行交互。 如果一方不在,另一方就会阻塞在一端,直到两端都在才进行交互。

创建语法: make(chan [type])

retChannel := make(chan string) // 创建无缓冲channel,并指明channel中的数据为string,双端等待
复制代码

输入语法: channel <-

channel <- object // channel输入
复制代码

获取语法: <- channel

object <- channel // channel输出
复制代码
  • 第二种:Buffer Channel(有缓冲)
i2MnUfz.png!web

这是一种稍微高级一点的 Channel 方式,(更加松耦合)。

首先,给 Channel 设置一个容量大小,并且不要求发送者与接受者同时站在两端。 然后,发送者会以 Buffer 的形式,不断往 Channel 里发送消息。 直到 Channel 的容量满了才阻塞。 这时,只要接受方接收了消息(即 Channel 有剩余容量了),发送者就会继续发送消息。

创建语法: make(chan [type], Int)

retChannel := make(chan string, 1) // 创建有缓冲channel,并指明channel中的数据为string
复制代码

输入语法: channel <-

channel <- object // channel输入
复制代码

获取语法: <- channel

object <- channel // channel输出
复制代码

Demo:模拟了一个网络请求的方法调用过程,通过 Channel 来控制当前协程在网络请求的等待过程中,去执行别的任务。

// 模拟网络请求
func serviceTask() string {
	fmt.Println("- start working on service task.")
	time.Sleep(time.Millisecond * 50)
	return "- service task is Done."
}

// 别的任务
func otherTask() {
	fmt.Println("start working on something else.")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("other task is Done.")
}

// csp异步管道
func AsyncService() chan string {
	retChannel := make(chan string) // 无缓冲channel,创建并指明channel中的数据为string,双端等待
	// retChannel := make(chan string, 1) // 有缓冲channel,创建并指明channel中的数据为string
	go func() {
		ret := serviceTask()
		fmt.Println("returned result.")
		retChannel <- ret // channel输入
		fmt.Println("service exited.")
	}()
	return retChannel
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh) // channel输出
	time.Sleep(time.Second * 1)
}
复制代码

四、多路选择和超时控制

使用 select 关键字,完成“多路选择”与“超时控制”。

  • 多路选择: 当返回的 channel 可能有多个时,可以使用select来处理多路的响应事件。

注意:这里与 switch 有点像,但是要注意的是,它并不是顺序判断的。也就是如果 channel1channel2 同时满足时,可能走的是 channel1 、也可能是 channel2 ,并不像 switch 一样做顺序的判断。

Demo:

select {
	case ret := <-channel1: 
		t.Log(ret)
	case ret:= <- channel2:
		t.Log(ret)
	case default:
		t.Error("No one returned.")
	}
复制代码
  • 超时控制:

同时,我们也可以设置一个超时等待的一个分路,当 channel 超时还未返回时,可以执行相应的代码。

Demo:

select {
	case ret := <-AsyncService(): //正常返回
		t.Log(ret)
	case <-time.After(time.Millisecond * 100): // 超时等待
		t.Error("time out")
	}
复制代码

五、channel的关闭和广播

要点如下:

  1. 向已经 closechannel 发消息,会导致程序 panic
  2. v, ok <- channel 。 其中, okbool 值, 若 ok==true 时,表示 channel 处于 open 状态。 若 ok==false 时,表示 channel 处于 close 状态。
  3. 所有 channel 接收者在 channel 关闭时,都会立刻从阻塞等待中返回,且 ok 值为 false 。(PS:广播机制,通常被利用向多个订阅者同时发送信号。如,退出信号。)

Demo:

// 消息生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}

		fmt.Println("channel close.")
		close(ch) // 关闭channel

		wg.Done()
	}()
}

// 消息接收者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
				fmt.Println(data)
			} else {
				fmt.Println("Receiver close.")
				break // channel被close
			}
		}
		wg.Done()
	}()
}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg) // 开启生产者
	wg.Add(1)
	dataReceiver(ch, &wg) // 开启消费者
	wg.Wait()
}
复制代码

六、任务的取消

通过上面的 close channel (广播机制),我们可以延伸一下,通过 close channel 通知所有 channel 取消当前的任务。

Demo如下:

func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false
	}
}

// 只能取消单个channel
func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

// 所有channel全部取消
func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0) // 创建了一个channal,通过它来控制事件取消
	for i := 0; i < 5; i++ {             // 开启5个协程
		go func(i int, chanclCh chan struct{}) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(cancelChan) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, cancelChan)
	}
	cancel_2(cancelChan) // 通知所有channel关闭。
	time.Sleep(time.Second * 1)
}
复制代码

七、Context与关联任务取消

刚才我们通过 close channel 来取消任务,但会有些问题。 比如,当一个任务被取消后,它所关联的子任务也应该被立即取消。

为了解决这个问题, go 1.9.0 之后, golang 加入了 context ,来保证关联任务的取消。

1. Context

context 就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知。

结构体如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
复制代码
  1. Deadline 会返回一个超时时间, Goroutine 获得了超时时间后,例如可以对某些io操作设定超时时间。
  2. Done 方法返回一个信道( channel ),当 Context 被撤销或过期时,该信道是关闭的,即它是一个表示 Context 是否已关闭的信号。
  3. Done 信道关闭后,Err方法表明Context被撤的原因。
  4. Value 可以让 Goroutine 共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个 map ,而这个 map 的读写则要加锁。

要点:

context.Background()
context.WithCancel(parentContext)
<-ctx.Done

2. 关联任务取消

我们把刚才的例子稍加调整,通过context来取消所有关联的任务。

  • 首先,创建一个 context
ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
复制代码
  • 编写一个取消方法,把 context 作为参数。
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}
复制代码
  • 开五个协程死循环,每个协程里面都有一个死循环,等待取消任务消息。再调用 cancel 方法。
for i := 0; i < 5; i++ {                                // 开启5个协程
        go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
            }
            fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
        }(i, ctx)
    }
    cancel() // 取消ctx
复制代码

完整示例代码如下:

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
	for i := 0; i < 5; i++ {                                // 开启5个协程
		go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, ctx)
	}
	cancel() // 取消ctx
	time.Sleep(time.Second * 1)
}
复制代码

八、常见并发任务(实战)

1. 只执行一次(单例模式)

场景:在多协程的情况下,保证某段代码只执行一次。

type Singleton struct {
	data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%p\n", obj)
			wg.Done()
		}()
	}
	wg.Wait()
}
复制代码

2. 仅需任意任务完成

利用channel管道通信的机制,我们可以再任何一个协程完成任务时,就给对象发消息。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。(如果用channel会导致协程泄漏,剩下9个channel会一直阻塞在系统中。)
	for i := 0; i < numOfRunner; i++ { // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	return <-ch // 返回channel里的第一个Response。(因为channel是一个先进先出的管道)
}

func TestFirstResponse(t *testing.T) {
	t.Log(firstResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
}
复制代码

3. 所有任务完成

刚才,我们介绍了first response,接下来我们看一下all response该怎么做。思路是一样的,只要接收到所有 channel 返回的数据,再返回即可。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func allResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。
	for i := 0; i < numOfRunner; i++ {   // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {
		finalRet += <-ch + "\n"
	}
	return finalRet // 返回channel里的所有的Response。(因为channel是一个先进先出的管道)
}

func TestAllResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine()) // 打印一下当前的协程数量
	t.Log(allResponse())                     // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
	t.Log("After:", runtime.NumGoroutine()) // 再打印一下当前的协程数量
}
复制代码

4. 对象池

我们可以用buffer channel的管道特性来做一个对象池。

emyIfqe.png!web

Demo:

type ReusableObj struct {
}

type ObjPool struct {
	bufChan chan *ReusableObj // 用于缓冲可重用对象
}

// 生产指定数量对象的对象池
func NewObjPool(numOfObj int) *ObjPool {
	ObjPool := ObjPool{}
	ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		ObjPool.bufChan <- &ReusableObj{}
	}
	return &ObjPool
}

// 从对象池中获得对象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): // 超时控制
		return nil, errors.New("time out")
	}
}

// 释放对象池里的对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10) // 生产一个10容量大小的对象池
	for i := 0; i < 10; i++ {
		if v, err := pool.GetObj(time.Second * 1); err != nil { // 获取obj
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)                      // 获取成功,答应日志。
			if err := pool.ReleaseObj(v); err != nil { // 释放obj
				t.Error(err)
			}
		}
	}
	fmt.Println("Done.")
}
复制代码

5. sync.pool对象缓存

我们可以通过sync.pool做对象缓存(创建、获取、缓存的策略)。

对象获取策略:

  1. 首先,尝试从私有对象获取。

  2. 其次,如果私有对象不存在,就尝试从当前 Process 的共享池获取。

  3. 如果当前 Process 的共享池是空的,就尝试从其他 Process 的共享池获取。

  4. 如果所有 Process 的共享池都是空的,就从 sync.pool 指定的 New 方法中 “New” 一个新的对象返回。

sync.pool缓存对象的生命周期:

  • 每一次 GC (垃圾回收)都会清除sync.pool的缓存对象。

  • 因此,对象缓存的有效期为下一次 GC 之前。

基本使用:

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} { // 创建一个新的对象
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int) // 获取对象
	fmt.Println(v)
	pool.Put(3) // 放回对象
	// runtime.GC() // 触发GC,会清除sync.pool中缓存的对象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}
复制代码

多协程下的使用:

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {// 创建10个协程
		wg.Add(1) 
		go func(id int) {
			fmt.Println(pool.Get()) // 获取对象
			wg.Done() 
		}(i)
	}
	wg.Wait()
}
复制代码

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK