3

在Go中集成ELK服务

 3 years ago
source link: https://jasonkayzk.github.io/2021/05/16/%E5%9C%A8Go%E4%B8%AD%E9%9B%86%E6%88%90ELK%E6%9C%8D%E5%8A%A1/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在前面两篇文章中,分别使用Docker-Compose构建了包含Filebeat和不包含Filebeat的ELK版本;

本文在成功构建了ELK服务的基础之上,简单介绍如何在Go项目中集成ELK服务;

系列文章:

在Go中集成ELK服务

前言

本文建立在你已经成功通过上一篇文章构建了ELK服务的基础之上;

因此,在阅读本文之前,请先确保已经成功部署了ELK服务;

如果不知道如何部署,建议先阅读:

此外,为了简单起见,本文中的Logstash配置和部署中的配置相同:

logstash.conf

input {
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 5044
    codec => json
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[service]}-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}
  • LogStash通过TCP连接的方式收集日志;
  • 同时上传ES时的索引格式为{service}-{date}

在Go中使用TCP连接上传日志

编写上传代码

既然在配置中声明的Logstash是通过TCP连接上传日志的,则我们通过在Go中创建一个TCP连接,上传日志即可;

代码如下:

logstash_demo.go

package main

import (
    "errors"
    "fmt"
    "net"
    "time"
)

// Logstash的TCP连接
type Logstash struct {
    Hostname   string
    Port       int
    Connection *net.TCPConn
    Timeout    int
}

// 创建一个Logstash连接
func New(hostname string, port int, timeout int) *Logstash {
    l := Logstash{}
    l.Hostname = hostname
    l.Port = port
    l.Connection = nil
    l.Timeout = timeout
    return &l
}

// 设置连接超时
func (l *Logstash) setTimeouts() {
    deadline := time.Now().Add(time.Duration(l.Timeout) * time.Millisecond)
    _ = l.Connection.SetDeadline(deadline)
    _ = l.Connection.SetWriteDeadline(deadline)
    _ = l.Connection.SetReadDeadline(deadline)
}

// 创建TCP连接
func (l *Logstash) Connect() (*net.TCPConn, error) {
    var connection *net.TCPConn
    service := fmt.Sprintf("%s:%d", l.Hostname, l.Port)
    addr, err := net.ResolveTCPAddr("tcp", service)
    if err != nil {
        return connection, err
    }
    connection, err = net.DialTCP("tcp", nil, addr)
    if err != nil {
        return connection, err
    }
    if connection != nil {
        l.Connection = connection
        _ = l.Connection.SetLinger(0) // default -1
        _ = l.Connection.SetNoDelay(true)
        _ = l.Connection.SetKeepAlive(true)
        _ = l.Connection.SetKeepAlivePeriod(time.Duration(5) * time.Second)
        l.setTimeouts()
    }
    return connection, err
}

// 写入数据
func (l *Logstash) Writeln(message string) error {
    var err = errors.New("tpc connection is nil")
    message = fmt.Sprintf("%s\n", message)
    if l.Connection != nil {
        _, err = l.Connection.Write([]byte(message))
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                _ = l.Connection.Close()
                l.Connection = nil
            } else {
                _ = l.Connection.Close()
                l.Connection = nil
                return err
            }
        } else {
            // Successful write! Let's extend the timeout.
            l.setTimeouts()
            return nil
        }
    }
    return err
}

func main() {
    l := New("192.168.24.88", 5044, 5)
    if _, err := l.Connect(); err != nil {
        panic(err)
    }

    if err := l.Writeln(`{ "foo" : "bar", "service": "test-service" }`); err != nil {
        panic(err)
    }
}

代码首先创建了一个Logstash类,代表了一个对于Logstash的TCP连接;

函数New即一个初始化Logstash连接的函数;

函数Connect用于将当前Logstash连接对象和Logstash服务器建立连接;

函数Writeln用于向TCP连接中写入数据,即提交一条JSON格式的日志;

最后,在main函数中,我们首先指定logstash服务器参数并创建了一个TCP连接,随后进行了连接,并提交了一条JSON格式的日志:

{ 
    "foo" : "bar", 
    "service": "test-service" 
}

日志中指定了servicetest-service,这将通过Logstash建立一个索引test-service-2021-05-16的索引(因为今天是2021年05月16日);

测试

代码编写完毕后,接下来我们进行测试;

首先启动ELK服务:

docker-compose up -d
Creating network "elk-single_default" with the default driver
Creating elk-single_elasticsearch_1 ... done
Creating elk-single_kibana_1        ... done
Creating elk-single_logstash_1      ... done

访问Kibana,结果如下:

即这时整个ELK是空的,我们没有数据,也没有为数据创建索引;

现在我们执行go项目:

go run logstash_demo.go

执行后查看Docker中的Logstash的日志:

docker logs -f elk-single_logstash_1
...
[2021-05-16T08:18:35,338][ERROR][logstash.inputs.tcp      ] Error in Netty pipeline: java.io.IOException: Connection reset by peer
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
          "host" => "192.168.24.1",
           "foo" => "bar",
      "@version" => "1",
          "port" => 53635,
    "@timestamp" => 2021-05-16T08:18:35.325Z,
       "service" => "test-service"
}

可见我们通过TCP连接提交的日志的确被Logstash解析了;

并且刷新Kibana,可以看到已经解析到了这个索引:

我们创建test-service-*的索引,并选择Time Filter@timestamp

随后,进行查询:

可见,我们提交的日志的确显示在了Kibana中(忽略另外一条测试日志);

在Go中集成ELK成功!

除了TCP连接之外,Logstash还支持各种各样的数据input形式;

这里不在介绍,感兴趣的可以看Logstash的官方文档:

使用ES Client上传日志

除了通过Logstash对日志进行收集之外,ES本身也是支持日志提交的;

比如:通过RESTful形式的API请求提交等等;

当然ES官方也提供了Go的客户端,可以通过Go直接操作ES;

既然可以通过客户端直接上传日志到ES中,为什么还要使用Logstash呢?

这是因为Logstash中提供了大量的配置参数,可以对大量日志进行提取、过滤,并且支持各种各样的数据源;

所以在使用时,一般都会使用Logstash进行日志的过滤和整理,然后再提交至ES中;

有关ES Client,这里不再赘述,感兴趣的可以看:

附录

系列文章:

相关阅读:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK