6

高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍

 2 years ago
source link: http://blueskykong.com/2021/04/05/nsq-4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍

我们在 前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用以及 nsqd 的实现原理。本篇将会结合源码介绍 nsqlookupd 的实现细节。

nsqlookupd 主要流程与上一篇文章介绍的 nsqd 执行逻辑相似,区别在于具体运行的任务不同。

在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件。

// 位于apps/nsqlookupd/main.go:45
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqlookupd.NewOptions()

flagSet := nsqlookupdFlagSet(opts)
...
}

同样,通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。

// 位于 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)
nsqlookupd, err := nsqlookupd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqlookupd", err)
}
p.nsqlookupd = nsqlookupd

go func() {
err := p.nsqlookupd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数。

我们来看下 nsqlookupd 是如何监听请求的,代码实现如下:

// 位于 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {
ctx := &Context{l}

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
l.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}

tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
})

err := <-exitCh
return err
}

开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求。

// 位于 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
go handler.Handle(clientConn)
}

logf(lg.INFO, "TCP: closing %s", listener.Addr())

return nil
}

TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn。

装饰 http 路由

httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.logf)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
s := &httpServer{
ctx: ctx,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

// v1 negotiate
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

处理客户端命令

tcp 解析 V1 协议,内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接。

var prot protocol.Protocol
switch protocolMagic {
case " V1":
prot = &LookupProtocolV1{ctx: p.ctx}
default:
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}

err = prot.IOLoop(clientConn)

通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

for {
line, err = reader.ReadString('\n')
if err != nil {
break
}

line = strings.TrimSpace(line)
params := strings.Split(line, " ")

var response []byte
response, err = p.Exec(client, reader, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
_, sendErr := protocol.SendResponse(client, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
continue
}

if response != nil {
_, err = protocol.SendResponse(client, response)
if err != nil {
break
}
}
}

conn.Close()

nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报自己的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

本文主要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护进程,负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口: TCP 接口, nsqd 用它来广播。 HTTP 接口,客户端用它来发现和管理。

下一篇文章,将会继续介绍 nsq 中其他模块实现的细节。

高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

高性能消息中间件 NSQ 解析-整体介绍

高性能消息中间件 NSQ 解析-应用实践

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?

订阅最新文章,欢迎关注我的公众号


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK