7

nodejs收集日志,rsyslog同步收集入es的实施

 3 years ago
source link: https://zhuanlan.zhihu.com/p/31572953
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nodejs收集日志,rsyslog同步收集入es的实施

前端开发话题下的优秀回答者

之前写过相关的2篇文章:

1,前端异常监控系统的落地

2,ElasticSearch和Gome-error-report的安装教程

其实中间还忽略了一个问题,在文章2中,我只是在GER-server的项目中增加了一个ES的create log的接口,这么做其实是一个不太好的方案,因为通过API的方式远程写日志,在大并发的情况下并不是最优的,会遇到一些瓶颈和写入失败的问题,如何能够避免呢?换了公司之后,自己在新公司做了一套新的日志收集方案,这里记录并且分享给大家。

如果没有读过前两篇文章的建议去阅读前两篇之后再来看这一篇,本文主要是说日志的落地和同步收集的实施。

首先来看一下这张图:

本文主要是说错误SDK上报后,我们如何来做负载和日志的同步,这里的技术栈包括了rsyslog,es,nodejs,crontab。

一,日志收集机的配置

首先,一开始设计的是sdk上报后,先推到nginx,拿nginx的access log做落地的,但是后来发现,当SDK上报一条信息,包含多条错误日志时,这种做法很蠢,因为在rsyslog中,都是一条一条去push到es的,中间需要处理的过程比较多,当然可以在nginx那一层拿lua来做一次处理,这里我选择使用nodejs。

处理逻辑比较简单,我贴一下完整代码:

let express = require('express');
let app = express();
let port = 80;
let fs = require('fs');
let fsExtra = require('fs-extra');
let path = require('path');
let moment = require('moment');
let accessDir = '/var/log/jslogs/';
let accessPath = accessDir + 'jserror.access.log';
let logpath = path.resolve(__dirname, accessPath);
let async = require('async');

fsExtra.ensureDirSync(accessDir);

var rotatingLogStream = require('file-stream-rotator').getStream({
  filename: accessPath,
  frequency: "1h",
  verbose: false,
  max_logs: "5d",
  audit_file: "/var/log/jslogs/log-audit.json"
});

app.get('/read.gif', (req, res, next) => {
  let img = fs.createReadStream(path.resolve(__dirname, './images/read.gif'));
  var err_msg = req.query.err_msg;
  if (err_msg) {
    let logs = [];
    let writelogs = [];
    let errmsg = decodeURIComponent(err_msg);
    // | 会分割成多条
    let errLogs = errmsg.split('|');
    errLogs.forEach((msg) => {
      let params = {
        log_master: 'js',
        ext: '-'
      };
      msg = msg.replace(/\^/g, '&');
      msg = msg.split('&');
      msg.forEach((item) => {
        item = item.split('=');
        params[item[0]] = item[1];
      });
      let timestamp = moment().format();
      let request_time = moment().format('YYYY-MM-DD hh:mm:ss');
      let log = {
        project_name: "JS",
        '@timestamp': timestamp,
        request_time: request_time,
        message: {
          log_master: params.log_master,
          msg: params.msg,
          projectType: params.projectType,
          currentUrl: params.currentUrl,
          flashVer: params.flashVer,
          level: params.level,
          referer: params.referer,
          screenSize: params.screenSize,
          timestamp: params.timestamp,
          userAgent: params.userAgent,
          title: params.title,
          host: params.host,
          colNum: params.colNum,
          rowNum: params.rowNum,
          targetUrl: params.targetUrl,
          ext: params.ext
        }
      }
      log = '@cee: ' + JSON.stringify(log) + '\n';
      writelogs.push(function(cb) {
	rotatingLogStream.write(log + '\n',cb);
      });
    });
    //写入日志
    async.parallel(writelogs);
    img.pipe(res);
  } else {
    img.pipe(res);
  }
});

app.use((req, res, next) => {
  res.status(404);
  res.send('404: File Not Found');
});


app.listen(port);
console.log('the server is listen on %s', port);

上面的代码可以简单说一下作用,nodejs监听80端口,然后get请求有一个read.gif的1x1图片接口,别人用err_msg=xxx之后访问,解析出参数最后再落到es格式的日志,日志切分用的是file-stream-rotator,每1小时切一次,保留5天内的日志。

日志参数详见GER SDK的上报格式,这里存的是一致的日志格式。

然后我们看一下rsyslog的设置,rsyslog分为客户端和服务端,客户端为收集日志的配置,既把nodejs的落地日志推到服务端机器,服务端则做接收解析和推入es中。

下面说一下rsyslog的安装和配置,我这边做的系统是centos6.5,默认安装的是5.8,需要安装最新的rsyslog以支持更多的插件和template语法。

wget http://rpms.adiscon.com/v8-stable/rsyslog.repo
mv rsyslog.repo /etc/yum.repos.d/rsyslog.repo
yum info rsyslog --skip-broken
yum install -y rsyslog rsyslog-elasticsearch rsyslog-mmjsonparse
rsyslogd -version

切记要装rsyslog的elasticsearch和mmjsonparse的扩展,当然这2个模块,在服务端的rsyslog上安装即可。

然后我们看一下客户端是如何配置的,在/etc/rsyslog.conf文件后添加:

module(load="imfile")
ruleset(name="remote"){
  action(type="omfwd" Protocol="tcp" Target="服务端ip" Port="514") stop
}
input(type="imfile"
    File="/var/log/jslogs/*.log.*"
    Facility="user"
    Severity="error"
    Tag="web_access"
    PersistStateInterval="1"
    Ruleset="remote")

这段配置指的是把/var/log/jslogs/下面的所有log,通过tcp推送到服务端的514端口。

然后我们再配置一下rsyslog的开机启动即可。

chkconfig rsyslog on

然后我们再配置一下nodejs的开机启动和pm2的日志,首先是安装pm2:

npm install pm2 -g
pm2 install pm2-logrotate
npm install pm2-gui

以上分别为pm2,pm2日志切割模块,pm2的可视化server。

然后使用pm2-gui start 启动可视化的pm2管理界面,默认端口是8088,默认密码:AuTh.

然后pm2启动我们上面的nodejs日志记录。

pm2 start index.js --name errorNodeLog -o ./logs/access.out.log -e ./logs/error.out.log

然后我们设置pm2的日志切割参数:

pm2 set pm2-logrotate:compress true 
pm2 set pm2-logrotate:rotateInterval '* * 1 * * * *'

设置完毕后,我们设置pm2的开机启动:

pm2 save
pm2 startup centos6

基本到这里,我们的客户端机器就搞定了。当然,安装pm2之前,如果你是新机器,还得安装nodejs,这里推荐用官方的方法,yum安装:

Installing Node.js via package manager | Node.js

启动好rsyslog和pm2的nodejs服务后,我们测试一下接口,是否会成功写入日志,然后我们开始配置服务端的机器。

二,日志服务端的配置:

日志服务的配置,我们要安装的软件如下:java,es,nodejs,GER-sever,以及rsyslog。

java比较好安装,yum list java* 装1.7或者1.8的都可以,这里不说了,nodejs安装同上。

然后是es和GER-server的安装,文章开头有讲,这里更新一下mapping的设置:

curl -XPUT esip:port/_template/template_1 -d '{"template":"logstash-web_access*","mappings":{"logs":{"properties":{"@timestamp":{"type":"date"},"message":{"properties":{"host":{"type":"string","index":"not_analyzed"}}}}}}}' 

自行更换esip和port即可。

然后记得安装一下可视化的es插件:

elasticsearch/bin/plugin install mobz/elasticsearch-head 

然后访问 http://{你的ip地址}:9200/_plugin/head/看是否安装成功,默认是9200端口,如果你改了es的端口,就修改9200地址。

rsyslog的安装上面说过了,我们直接看服务端的配置,在/etc/rsyslog.conf下加入下面配置:

module(load="imtcp")
module(load="mmjsonparse")
module(load="omelasticsearch")

input(type="imtcp" Port="514" Ruleset="apprule")

template(name="DynFile" type="string" string="/var/log/applog/%$year%-%$month%-%$day%/%fromhost-ip%.log")

template(name="logstash-web_access" type="list") {
    constant(value="logstash-") property(name="syslogtag" format="json")
    constant(value="-")
    property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4")
    constant(value=".")
    property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7")
    constant(value=".")
    property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")
}

template(name="es-tpl" type="list"){
    constant(value="{\"@timestamp\":\"") property(name="timereported" dateFormat="rfc3339")
    constant(value="\",") property(name="$!all-json" position.from="2")
}

template( name="nginx-log" type="string" string="%msg%\n" )

ruleset(name="apprule") {
    action(type="mmjsonparse")
    action(type="omfile" DynaFile="DynFile")
    action(type="omelasticsearch"
        template="es-tpl"
        searchIndex="logstash-web_access"
        dynSearchIndex="on"
        bulkmode="on"
        searchType="logs"
        queue.type="LinkedList"
        queue.size="1000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
        server="es-ip"
        serverport="es-port"
        errorFile="/var/log/applog/es-error.log") stop
}

请自行替换最后面omelasticsearch的配置,把server和serverport换成对应自己的服务和端口地址,简单说一下这个配置做了什么。

首先加载几个需要用到的module,然后下面定义了入es的template,模板方面的语法可以自己参考一下rsyslog官方的文档,比较简单,主要是字符替换和格式替换用。因为我们同步的日志只有我们nodejs收集的程序日志,所以订了一个apprule的规则,最后配置这个apprule规则,先是把日志json进行解析,然后按照DynFile的template格式写入日志,最后再推送到es中。

三,测试:

配置完成后我们通过刷新客户端的接口来进行测试,可以tail一下2台日志的日志做对比,如果同步成功就算大功告成了,当然最后还要查看一下es里是否真正的推入了日志:

es中如果也推数据成功,根据上一篇文章中的GER-Server来配置可视化的错误平台就成了,之前的专栏文章都有说过。

最后,由于es也要做成开机启动,建议都拿chkconfig来进行维护,使用yum安装,然后同样给可视化的平台加上pm2相关的配置和开机启动。

下面贴一下,crontab下我们如何对es的索引进行管理的脚本:

const request = require('request');
const moment = require('moment');
const url = require('./config').url;

const date15Ago = moment().subtract(16, 'd'); // 获取15天之前的日期
const date15AgoStr = date15Ago.year() + '.' + (date15Ago.month() + 1) + '.' + date15Ago.date();

module.exports = function () {
    // 删除15天前的日志日志
    request.delete({
        url: url + date15AgoStr,
        json: true
    }, function (error, response, body) {
        if(!error){
            if(response.statusCode == 200){
                console.info('success: delete logs of ' + date15AgoStr);
            } else if(response.statusCode == 404) {
                console.error('error: error in delete log:\nlog does not exist');
            } else {
                console.error('error: error in delete log:\n');
                console.error(body);
            }
        }
    });
}

删除15天之前的索引脚本。

const request = require('request');
const moment = require('moment');
const config = require('./config');

const date = moment().subtract(1, 'd'); // 获取前一天的日期
const dateStr = date.year() + '.' + (date.month() + 1) + '.' + date.date();

module.exports = function () {
    getLogInfo(function (data) {
        saveLogInfo(data);
    });

    function getLogInfo(cb) {
        // 统计前一天的日志信息
        request({
            url: config.url + dateStr + '/logs/_search',
            json: true,
            body: {
                "size" : 0,
                "aggs" : {
                    "projectType" : {
                        "terms" : {
                            "field" : "message.projectType"
                        },
                        "aggs" : {
                            "hosts" : {
                                "terms" : {
                                    "field" : "message.host"
                                }
                            }
                        }
                    }
                }
            }}, function (error, response, body) {
            // console.log(body);
            // console.log(JSON.stringify(body.aggregations.projectType.buckets));
            if(!error){
                if(response.statusCode == 200){
                    console.info('success: get log info');
                    cb(body.aggregations.projectType.buckets);
                } else if(response.statusCode == 404){
                    console.error('error: error in get log info:\nlog does not exist');
                } else {
                    console.error('error: error in get log info:\n');
                    console.error(body);
                }
            }
        });
    }

    // 存储前一天的日志信息
    function saveLogInfo(data) {
        request.post({
            url: config.baseUrl + 'log_count/' + dateStr,
            json: true,
            body: {
                info: data
            }
        }, function (error, response, body) {
            if(!error){
                if(response.statusCode == 201){
                    console.info('success: save log info');
                } else {
                    console.error('error: error in saving log info:\n');
                    console.error(body);
                }
            }
        });
    }
}

统计前一天有多少域名,多少端(pc|mobile)错误count的脚本,然后再反存回es。

最后我们再通过crontab把2个脚本做一个定时执行即可,把入口的js文件开头加入:

#!/usr/bin/env node

这一行就可以啦。

ok,做一个记录,怕过2星期我自己也忘了,多谢大家观看。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK