21

golang select channel 如何保证安全退出,不丢失数据?

 4 years ago
source link: https://studygolang.com/articles/26902
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

今天研究了一下channel的源码,对channel的安全退出有了一些小见解。在此结合实际应用,对select 于channel结合对情况下,安全退出channel做一下记录。

场景1:直接退出(会丢失数据) 

因为退出时,直接程序就中断了,channel里存对数据直接丢失。

package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	wg      sync.WaitGroup
	channel = make(chan int, 10)
)

func main() {
	//先写满一个channel
	for i := 0; i < 10; i++ {
		channel <- i
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case num := <-channel:
				
				fmt.Println("======", num)
				//每次从channel取值后sleep 1秒,方便我们分析
				time.Sleep(time.Duration(num) * time.Second)

			}
		}
	}()
	wg.Wait()
}

场景2:捕捉程序退出信号,然后关闭channel (不丢失数据)

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

var (
	wg      sync.WaitGroup
	channel = make(chan int, 10000)
)

func main() {
	//先写满一个channel
	for i := 0; i < 10; i++ {
		channel <- i
	}
	wg.Add(1)
	go HandleSignals()
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case num, ok := <-channel:
				if !ok {
					return
				}
				fmt.Println("======", num)
				//每次从channel取值后sleep 1秒,方便我们分析
				time.Sleep(time.Duration(num) * time.Second)


			}
		}
	}()
	wg.Wait()
}
func HandleSignals() {
	defer wg.Done()

	ch := make(chan os.Signal, 10)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2)
	for {
		sig := <-ch
		switch sig {
		case syscall.SIGINT, syscall.SIGTERM:
			close(channel)
			log.Println("Exiting, please wait...")
			return
		}
	}
}

以上实现是在捕捉到系统退出信号时 执行了 close(channel) 。 从而实现,完全退出前,仍将缓存在channel中到数据,读出并执行。

那是怎么实现的呢? 通过阅读源码 go/src/runtime/chan.go: closechan

看到以下实现,可以看到,在close channel时,仍会将channel中的数据读出来。 因此,我们要使用此特性时,就需要根据系统退出信号,关闭channel。然后判断channel是否关闭,若关闭,再退出for循环。 否则,直接退出的程序,就会直接将channel中的数据抛弃。

func closechan(c *hchan) {
    ...
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    ...

    c.closed = 1

    var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    ...
}

原文地址: https://www.yuanshuli.com/post-68.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK