6

MySQL MHA信息的收集【Filebeat+logstash+MySQL】 - 东山絮柳仔

 1 year ago
source link: https://www.cnblogs.com/xuliuzai/p/17320183.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

一.项目背景

随着集团MHA集群的日渐增长,MHA管理平台话越来越迫切。而MHA平台的建设第一步就是将这些成百上千套的MHA集群信息收集起来,便于查询和管理。

MHA主要信息如下:

(1)基础配置信息;

(2)运行状态信息;

(3)启动及FailOver的log信息。

集团目前数据库的管理平台是在Archery的基础上打造,所以,需要将此功能嵌入到既有平台上。通过Archery系统进行查询展示。

780228-20230415095747826-1661925485.png

 简单来说,通过 Filebeat + Logstash + MySQL 架构 来收集保存各个集群的配置信息、启动及FailOver的log信息 和运行状态信息。

运行状态信息是通过一个小程序获取的,这个小程序每分钟执行一次,会把执行结果输出到文件中。当然这个文件是被failbeat监控的。

3.1 获取MHA状态的脚本

文件为mha_checkstatus.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import io
import re
import ConfigParser

Path='/etc/mha'
#fout=open('输出文件名','w')
for Name in os.listdir(Path) :
  Pathname= os.path.join(Path,Name)
 ## print(Pathname)
 ## print(Name)
  config =ConfigParser.ConfigParser()
  try:
    config.read(Pathname)
    server_item = config.sections()
    server1_host = ''  ##MHA cnf 配置文件中的节点1
    server2_host = ''  ##MHA cnf 配置文件中的节点2
    server3_host = ''  ##MHA cnf 配置文件中的节点3
    mha_cnf_remark = ''
    if 'server1' in server_item:
      server1_host = config.get('server1','hostname')
    else:
       mha_cnf_remark = mha_cnf_remark + 'Server1未配置;'
    if 'server2' in server_item:
      server2_host = config.get('server2','hostname')
    else:
      mha_cnf_remark = mha_cnf_remark + 'Server2未配置;'
    if 'server3' in server_item:
      server3_host = config.get('server3','hostname')

      ##print(mha_cnf_remark)
  except Exception as e:
    print(e)

  mha_status_result =''
  ###20190330
  Name = Name.replace(".cnf", "")

  ###集群一主一从
  if server1_host <> '' and server2_host <> '' and server3_host == '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)

  ####集群一主两从
  if server1_host <> '' and server2_host <> '' and server3_host <> '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)

 概况说明,就是到存放MHA配置的文件夹,根据每个集群的配置文档,去逐一执行下masterha_check_status,把结果格式化,输出到指定的文件中。这个就是每个集群的状态数据。通过filebeat实时汇报上去。

触发的方式可以是crontab,每分钟执行一次。再本案中是输出到 /???/checkmhastatus/masterha_check_status.log 中。

形式类似如下:

*/1 * * * * python /???/????/mha_checkstatus.py >>   /???/????/masterha_check_status.log

3.2 表的设计及脚本

3.2.1 运行状态表 dbmha_status

CREATE TABLE `dbmha_status` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `confpath` varchar(500) NOT NULL,
  `mhstatus` varchar(100) NOT NULL,
  `serverip` varchar(100) NOT NULL,
  `info` varchar(2000) NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.2 mha log 信息表 dbmha_log

CREATE TABLE `dbmha_log` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `filename` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `message` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.3 MHA 基础配置表 dbmha_conf_info

CREATE TABLE `dbmha_conf_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL DEFAULT '',
  `confpath` varchar(500) NOT NULL DEFAULT '',
  `manager_log` varchar(500) NOT NULL DEFAULT '',
  `manager_workdir` varchar(500) NOT NULL DEFAULT '',
  `master_binlog_dir` varchar(500) NOT NULL DEFAULT '',
  `failover_script` varchar(500) NOT NULL DEFAULT '',
  `online_change_script` varchar(500) NOT NULL DEFAULT '',
  `password` varchar(128) NOT NULL DEFAULT '',
  `ping_interval` varchar(100) NOT NULL DEFAULT '',
  `remote_workdir` varchar(100) NOT NULL DEFAULT '',
  `repl_password` varchar(128) NOT NULL DEFAULT '',
  `repl_user` varchar(20) NOT NULL DEFAULT '',
  `ssh_user` varchar(20) NOT NULL DEFAULT '',
  `user` varchar(20) NOT NULL DEFAULT '',
  `serverip1` varchar(100) NOT NULL DEFAULT '',
  `port1` varchar(10) NOT NULL DEFAULT '',
  `candidate_master1` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay1` varchar(20) NOT NULL DEFAULT '',
  `serverip2` varchar(100) NOT NULL DEFAULT '',
  `port2` varchar(10) NOT NULL DEFAULT '',
  `candidate_master2` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay2` varchar(20) NOT NULL DEFAULT '',
  `serverip3` varchar(100) NOT NULL DEFAULT '',
  `port3` varchar(10) NOT NULL DEFAULT '',
  `candidate_master3` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay3` varchar(20) NOT NULL DEFAULT '',
  `info` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.3 filbeat 中关于读取文件的配置

..............
- type: log
  paths:
    - /???/????/masterha_check_status.log
  fields:
    log_type: mha-status
    db_host: 111.111.XXX.1XX    ###这个IP为mha Mnaager所在serverip

- type: log
  paths:
    - /???/mhaconf/*.cnf
  fields:
    log_type: mha-cnf
    db_host: 111.111.XXX.XXX
  multiline.type: pattern
  multiline.pattern: '^\[server [[:space:]] default'
  multiline.negate: true
  multiline.match: after


- type: log
  paths:
    - /???/????/mha/*/*.log
  fields:
    log_type: mysql-mha
    db_host: 111.111.XXX.XXX
................

3.4 Logstash 的配置文件

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

filter {

    if [fields][log_type] == "mysql-mha" {
        grok {
            match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"]
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
        copy => { "[log][file][path]" => "logpath"
                 "[fields][db_host]" => "manager_ip" }
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }


    if [fields][log_type] == "mha-cnf" {
        mutate {
        split => ["message","server"]
        add_field => {"message1" => "%{[message][1]}"}
        add_field => {"messages1" => "%{[message][2]}"}
        add_field => {"messages2" => "%{[message][3]}"}
        add_field => {"messages3" => "%{[message][4]}"}
        add_field => {"dft_password" => "*********"}
        add_field => {"dft_repl_password" => "*********"}
        }
        kv {
             source => "message1" 
             field_split => "\n"
             include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ]
             prefix => "dft_"
             remove_char_value => "<>\[\]," 
        }
        kv {
             source => "messages1"
             field_split => "\n"
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s1_"
        }
        kv {
             source => "messages2"
             field_split => "\n"
             default_keys => [ "s2_candidate_master", "",
                         "s2_check_repl_delay", "",
                         "s2_hostname","",
                          "s2_port",""
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s2_"
        }
        kv {
             source => "messages3"
             field_split => "\n"
             default_keys => [ "s3_candidate_master", "",
                         "s3_check_repl_delay", "",
                         "s3_hostname","",
                          "s3_port","" 
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s3_"
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
             copy => { "[log][file][path]" => "conf_path" }
             gsub => [
                      "message", "需要加密的***密***码", "*********",
                      "message", "需要加密的其他字符", "*********"
                      ]
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

    if [fields][log_type] == "mha-status" {
       mutate {
        split => ["message",":::"]
        add_field => {"cluster_name" => "%{[message][0]}"}
        add_field => {"conf_path" => "%{[message][1]}"}
        add_field => {"masterha_check_status" => "%{[message][2]}"}
        add_field => {"server" => "%{[message][3]}"}
        add_field => {"info" => "%{[message][4]}"}
         }

        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

}


output {
    if [fields][log_type] == "mysql-mha" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"]
       }
    }

    if [fields][log_type] == "mha-status" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"]
       }
   }
    if [fields][log_type] == "mha-cnf" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"]
       }
   }

}

 这个配置还是相对复杂难懂的。这个文件配置了对三种文件的读取,我们就看读取mha配置文件的部分【[fields][log_type] == "mha-cnf"】,我们挑其中的几个点说下,更多的内容可参照logstash官网--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

首先,我们是 “server” 关键字,把文件中的配置信息,分割成不同的部分。

接着,因为配置文件的格式是 key=value的样式,所以需要借助 kv{},其中的参数说下:field_split---定义字段间的分隔符;include_keys--定义只读去规定的特定key;prefix---格式化字段名字,加个前缀名字,主要是用来区分server 1 部分和 server2、、、之间的分别。

 通过【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}】,获取product字段,我们是通过mha的配置文件的名字来定义集群的名字,即规范了mha配置文件的名字的命名来自于集群的名字,反推得知了配置文件的名字,就知道了集群的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}】这个地方的filename包含了文件的后缀。

四.平台前端

我们是把此项目嵌入到既有的Archery平台中,增加了3个查询界面,界面的实现,在此就不具体展开了。

需要注意的是,界面需要支持模糊查询,例如支持MHA Manager Server IP查询(方便查询各个Manager节点上有多少集群);支持集群名字的模糊查询;支持节点serverIP的模糊查询。

五.补充说明

Q.1 为什么用MySQL存储信息,ELK是更成熟的架构啊?

是的,用elasticsearch来存储这种文本信息更常见。我们用MySQL替代elasticsearch基于以下考虑:(1)我们既有的管理平台使用的是MySQL,把他们保存到MySQL 便于集成;(2)这些数据,不仅仅是Log,还有些是基础数据,放到MySQL便于相互管理、聚合展示(3)这是数据量并不大,例如mha.log,只有在启动或者failover时才有变化,conf信息也是很少的,所以,从数据量也一点考虑,也不需要保存到MySQL。

Q.2 Logstash 可以把数据写入到MySql中吗?

是可以的。主要是logstash-output-jdbc、logstash-codec-plain插件的安装。

如果是离线的环境下安装,可以参考 《logstash 离线安装logstash-output-jdbc》

https://blog.csdn.net/sxw1065430201/article/details/123663108

Q.3 MHA log 文件夹中 原有一个 .health ,里面是MHA每分钟的健康性报告,那为什么还要自己写Python程序获取呢?

因为.healthy 的内容不是换行符结尾,而filebeat是以换行符来判断的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有详细说明)。

简单来说,filebeat读取不了。

Q.4 MHA 健康性检查的原理

具体的原理可以参考此文章的分析说明--《mha检测mysql状况方式》

https://blog.csdn.net/weixin_35411438/article/details/113455263

Q.5 历史数据的删除

mha log 信息表 dbmha_log

MHA 基础配置表 dbmha_conf_info

以上两张表基本上很少变化,量不大,其数据无需定期删除。

运行状态表 dbmha_status,此表每个集群每分钟(具体crontab定义)都会有新数据插入,数据量增长较大,应设置定时任务,定期删除历史数据,例如删除7天前的数据。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK