4

Flume

 3 years ago
source link: https://www.jansora.com/notes/584
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Flume 简介

Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具.

Flume 还具有强大的流机制, 大多数场景下 Flume 作为数据流的起点, 通过 kafka 等消息队列将数据流原因不断的传输到其他流计算或者 Hdfs, hbase 等存储节点, 用作实时分析计算或者离线数据仓库存储等.

Flume 详解

  • Event 直译为事件, 动作, Event 描述一个原子数据单元从输入端到输出端的过程, 但 Event 只是描述了数据流动的事件, 事实上 Event 从输入端流到输出端的事件的全过程是由 Agent 来控制的.
  • Agent 直译为代理, Agent 代理了 Event 的行为, Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。

那么 Agent 是怎么控制数据流动的呢? 通过Source, Channel, Sink来控制的. Source 输入数据, Channel 暂存数据, Slink 输出数据

  • Source. source 用来接收 Event 数据作为 Flume 的输入源, 满足 Thrift协议 的 Event 数据都能被source识别和接收. source 接收到数据后转存到 Channel.
  • Channel. Channel 用来存储 Source 接收到的数据, 直至被 Slink 消费到为止.
  • Slink. Slink用来消费Channel存储的数据, 并将数据输出到消息队列 kafka 或 存储 Hbase, Hdfs, 或 ........

Source 输入数据

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source。

Channel 数据存储

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。 Flume对于 Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。

  • MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
  • MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
  • FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

Slink 输出数据

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。 Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。 Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Flume 业务定位

Flume 处于数据链的最顶端, 对接数据产生方, 负责将数据生产方产生的数据(日志, 流水等)以流式实时传递出去.

Hadoop业务的整体开发流程图

flume-location.png

Flume 数据流

基础流模型

Flume 作为一个流式应用程序, 没有什么比图来的更为直观的了.

下图是 Flume 基础流模型内部数据流转图

数据流转方向 WebServer.log -> Source -> Channel -> Slink -> HDFS

flume-stream.png

复杂流模型

下图是 Flume 复杂流模型内部数据流转图

这是个分布式数据流采集流程, 每个Web应用程序都绑定了一个 Flume 收集日志, 这些 Flume 把采集到的日志汇总到一个主 Flume. 主 Flume 最终落地到HDFS flume-multi-stream.png

Flume 安装

  1. 下载. https://flume.apache.org/download.html
  2. 解压 tar zxvf apache-flume-{version}-bin.tar.gz

3.配置 JAVA_HOME

cd apache-flume-1.9.0-bin/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh

首行加入export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 4. 测试版本 ./bin/flume-ng version

Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume 配置

具体如何配置 source, channel, sink, 请参考官网, 这里就一一不拷贝了 https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html 下面简单介绍几个Demo

Flume 实战演练

采集 Nginx 日志并打印到控制台

  1. 配置 flume-nginx-console.conf
# 名为 nginx 的 Agent, 配置 source, channel, sink
nginx.sources = source
nginx.channels = channel
nginx.sinks = sink

# 配置source 
# terminal 读取 access.log
nginx.sources.source.type = exec
nginx.sources.source.command = tail -f /var/log/nginx/access.log
nginx.sources.source.channels = channel

# 配置channel
# 类型 内存channel , 最大容量为1000, 最大事务容量为100
nginx.channels.channel.type = memory 
nginx.channels.channel.capacity = 1000
nginx.channels.channel.transactionCapacity = 100

# 配置sink 
#输入 channel, 输出类型为logger输出
nginx.sinks.sink.channel = channel
nginx.sinks.sink.type = logger

  1. 启动 flume-ng

bin/flume-ng agent --conf conf --conf-file conf/flume-nginx-console.conf --name nginx -Dflume.root.logger=INFO,console

  1. 另开一个终端, 执行curl localhost, 查看 flume 是否打印access日志

采集 Nginx 日志并发送到kafka

  1. 配置 flume-nginx-kafka.conf
# 名为 nginx 的 Agent, 配置 source, channel, sink
nginx.sources = source
nginx.channels = channel
nginx.sinks = sink

# 配置source 
# terminal 读取 access.log
nginx.sources.source.type = exec
nginx.sources.source.command = tail -f /var/log/nginx/access.log
nginx.sources.source.channels = channel

# 配置channel
# 类型 内存channel , 最大容量为1000, 最大事务容量为100
nginx.channels.channel.type = memory 
nginx.channels.channel.capacity = 1000
nginx.channels.channel.transactionCapacity = 100

# 配置sink 
#输入 channel, 输出类型为kafka
nginx.sinks.sink.channel = channel
nginx.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
nginx.sinks.sink.kafka.topic = nginx
nginx.sinks.sink.kafka.bootstrap.servers = localhost:9092
nginx.sinks.sink.kafka.flumeBatchSize = 20
nginx.sinks.sink.kafka.producer.acks = 1
nginx.sinks.sink.kafka.producer.linger.ms = 1
nginx.sinks.sink.kafka.producer.compression.type = snappy
  1. 启动 flume-ng

bin/flume-ng agent --conf conf --conf-file conf/flume-nginx-console.conf --name nginx -Dflume.root.logger=INFO,console 3. 另开一个终端, 执行curl localhost, 查看 kafka 是否收集到access日志


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK