3

channel 实战应用,这篇就够了!

 3 years ago
source link: https://studygolang.com/articles/32839
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

有一说一,这篇文章有点标题党了,但是绝对是干货。

已经有很多关于 channel 的文章,为什么我还要写呢?任何知识点,只要你想,就可以从不同的角度切入!那就写点 channel 应用相关的东西。通过不同场景使用 channel 特性加深理解!所以在看这篇文章之前,首先得先去了解 channel。

由 channel 引发的血案

上面那篇文章漏了一个我觉得很关键的知识点,并且我们还经常在上面犯错误。即使是那些牛逼的开源项目,也有过类似 bug。

我的问题是:channel 的哪些操作会引发 panic?

1.关闭一个 nil 值 channel 会引发 panic。

package main

func main() {
  var ch chan struct{}
  close(ch)
}

U3UNJbM.png!mobile

2.关闭一个已关闭的 channel 会引发 panic。

package main

func main() {
  ch := make(chan struct{})
  close(ch)
  close(ch)
}

2Yr6R3B.png!mobile

3.向一个已关闭的 channel 发送数据。

package main

func main() {
  ch := make(chan struct{})
  close(ch)
  ch <- struct{}{}
}

QBZbY32.png!mobile

以上三种 channel 操作会引发 panic。

你可能会说,我咋么会犯这么愚蠢的错误。这只是一个很简单的例子,实际项目是很复杂的,一不小心,你就会忘了自己曾在哪一个 g 里关闭过 channel。

如果你对某块代码没有安全感,相信我,就算它中午不出事,早晚也得出事。

channel 的一些应用

  • 信号通知
  • 超时控制
  • 生产消费模型
  • 数据传递
  • 控制并发数
  • 互斥锁
  • one million……

1.信号通知

经常会有这样的场景,当信息收集完成,通知下游开始计算数据。

package main

import (
  "fmt"
  "time"
)

func main() {
  isOver := make(chan struct{})
  go func() {
    collectMsg(isOver)
  }()
  <-isOver
  calculateMsg()
}

// 采集
func collectMsg(isOver chan struct{}) {
  time.Sleep(500 * time.Millisecond)
  fmt.Println("完成采集工具")
  isOver <- struct{}{}
}

// 计算
func calculateMsg() {
  fmt.Println("开始进行数据分析")
}

如果只是单纯的使用通知操作,那么类型就使用 struct{} 。因为空结构体在 go 中是不占用内存空间的,不信你看。

package main

import (
"fmt"
"unsafe"
)

func main() {
  res := struct{}{}
  fmt.Println("占用空间:", unsafe.Sizeof(res))
}
//占用空间: 0

2.执行任务超时

我们在做任务处理的时候,并不能保证任务的处理时间,通常会加上一些超时控制做异常的处理。

package main

import (
  "fmt"
  "time"
)

func main() {
  select {
  case <-doWork():
    fmt.Println("任务结束")
  case <-time.After(1 * time.Second):
    fmt.Println("任务处理超时")
  }
}

func doWork() <-chan struct{} {
  ch := make(chan struct{})
  go func() {
    // 任务处理耗时
    time.Sleep(2 * time.Second)
  }()
  return ch
}

3.生产消费模型

生产者只需要关注生产,而不用去理会消费者的消费行为,更不用关心消费者是否执行完毕。而消费者只关心消费任务,而不需要关注如何生产。

package main

import (
  "fmt"
  "time"
)

func main() {
  ch := make(chan int, 10)
  go consumer(ch)
  go producer(ch)
  time.Sleep(3 * time.Second)
}

// 一个生产者
func producer(ch chan int) {
  for i := 0; i < 10; i++ {
    ch <- i
  }
  close(ch)
}

// 消费者
func consumer(task <-chan int) {
  for i := 0; i < 5; i++ {
    // 5个消费者
    go func(id int) {
      for {
        item, ok := <-task
        // 如果等于false 说明通道已关闭
        if !ok {
          return
        }
        fmt.Printf("消费者:%d,消费了:%dn", id, item)
        // 给别人一点机会不会吃亏
        time.Sleep(50 * time.Millisecond)
      }
    }(i)
  }
}

4.数据传递

极客上一道有意思的题,假设有4个 goroutine ,编号为1,2,3,4。每秒钟会有一个 goroutine 打印出它自己的编号。现在让你写一个程序,要求输出的编号总是按照1,2,3,4这样的顺序打印。类似下图,

R7nuy2U.png!mobile

package main

import (
  "fmt"
  "time"
)

type token struct{}

func main() {
  num := 4
  var chs []chan token
  // 4 个work
  for i := 0; i < num; i++ {
    chs = append(chs, make(chan token))
  }
  for j := 0; j < num; j++ {
    go worker(j, chs[j], chs[(j+1)%num])
  }
  // 先把令牌交给第一个
  chs[0] <- struct{}{}
  select {}
}

func worker(id int, ch chan token, next chan token) {
  for {
    // 对应work 取得令牌
    token := <-ch
    fmt.Println(id + 1)
    time.Sleep(1 * time.Second)
    // 传递给下一个
    next <- token
  }
}

5.控制并发数

我经常会写一些脚本,在凌晨的时候对内或者对外拉取数据,但是如果不对并发请求加以控制,往往会导致 groutine 泛滥,进而打满 CPU 资源。往往不能控制的东西意味着不好的事情将要发生。对于我们来说,可以通过 channel 来控制并发数。

package main

import (
  "fmt"
  "time"
)

func main() {
  limit := make(chan struct{}, 10)
  jobCount := 100
  for i := 0; i < jobCount; i++ {
    go func(index int) {
      limit <- struct{}{}
      job(index)
      <-limit
    }(i)
  }
  time.Sleep(20 * time.Second)
}

func job(index int) {
  // 耗时任务
  time.Sleep(1 * time.Second)
  fmt.Printf("任务:%d已完成n", index)
}

当然了, sync.waitGroup 也可以。

package main

import (
  "fmt"
  "sync"
  "time"
)

func main() {
  var wg sync.WaitGroup
  jobCount := 100
  limit := 10
  for i := 0; i <= jobCount; i += limit {
    for j := 0; j < i; j++ {
      wg.Add(1)
      go func(item int) {
        defer wg.Done()
        job(item)
      }(j)
    }
    wg.Wait()
  }
}

func job(index int) {
  // 耗时任务
  time.Sleep(1 * time.Second)
  fmt.Printf("任务:%d已完成n", index)
}

6.互斥锁

我们也可以通过 channel 实现一个小小的互斥锁。通过设置一个缓冲区为1的通道,如果成功地往通道发送数据,说明拿到锁,否则锁被别人拿了,等待他人解锁。

package main

import (
  "fmt"
  "time"
)

type ticket struct{}

type Mutex struct {
  ch chan ticket
}

// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
  return &Mutex{ch: make(chan ticket, 1)}
}

// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
  m.ch <- struct{}{}
}

// 解锁就把数据取出
func (m *Mutex) unLock() {
  select {
  case <-m.ch:
  default:
    panic("已经解锁了")
  }
}

func main() {
  mutex := newMutex()
  go func() {
    // 如果是1先拿到锁,那么2就要等1秒才能拿到锁
    mutex.Lock()
    fmt.Println("任务1拿到锁了")
    time.Sleep(1 * time.Second)
    mutex.unLock()
  }()
  go func() {
    mutex.Lock()
    // 如果是2拿先到锁,那么1就要等2秒才能拿到锁
    fmt.Println("任务2拿到锁了")
    time.Sleep(2 * time.Second)
    mutex.unLock()
  }()
  time.Sleep(500 * time.Millisecond)
  // 用了一点小手段这里最后才能拿到锁
  mutex.Lock()
  mutex.unLock()
  close(mutex.ch)
}

到这里,这篇文章已经尾声了。当然我只是列举了部分 channel 的应用场景。你完全可以发挥自己的想象,在实际工作中,构建更完美且贴近生产的设计。

如果你还有其他不同的应用模式场景,欢迎下方留言和我交流。

另外源码我放在 github 上了,地址: https://github.com/wuqinqiang/Go_Concurrency

如果文章对你有所帮助,点赞、转发、留言都是一种支持!

欢迎关注公众号 吴亲强的深夜食堂,一起学习。

有疑问加站长微信联系(非本文作者)

eUjI7rn.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK