Flume

Flume 简介

Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具.

Flume 还具有强大的流机制, 大多数场景下 Flume 作为数据流的起点, 通过 kafka 等消息队列将数据流原因不断的传输到其他流计算或者 Hdfs, hbase 等存储节点, 用作实时分析计算或者离线数据仓库存储等.

Flume 详解

基本概念

  • Event 直译为事件, 动作, Event 描述一个原子数据单元从输入端到输出端的过程, 但 Event 只是描述了数据流动的事件, 事实上 Event 从输入端流到输出端的事件的全过程是由 Agent 来控制的.
  • Agent 直译为代理, Agent 代理了 Event 的行为, Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。

那么 Agent 是怎么控制数据流动的呢? 通过Source, Channel, Sink来控制的. Source 输入数据, Channel 暂存数据, Slink 输出数据

  • Source. source 用来接收 Event 数据作为 Flume 的输入源, 满足 Thrift协议 的 Event 数据都能被source识别和接收. source 接收到数据后转存到 Channel.
  • Channel. Channel 用来存储 Source 接收到的数据, 直至被 Slink 消费到为止.
  • Slink. Slink用来消费Channel存储的数据, 并将数据输出到消息队列 kafka 或 存储 Hbase, Hdfs, 或 ........

Source 输入数据

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。
Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source。

Channel 数据存储

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。
Flume对于 Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。

  • MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
  • MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
  • FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

Slink 输出数据

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。
Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Flume 业务定位

Flume 处于数据链的最顶端, 对接数据产生方, 负责将数据生产方产生的数据(日志, 流水等)以流式实时传递出去.

Hadoop业务的整体开发流程图

flume-location.png

Flume 数据流

基础流模型

Flume 作为一个流式应用程序, 没有什么比图来的更为直观的了.

下图是 Flume 基础流模型内部数据流转图
数据流转方向 WebServer.log -> Source -> Channel -> Slink -> HDFS

复杂流模型

下图是 Flume 复杂流模型内部数据流转图
这是个分布式数据流采集流程, 每个Web应用程序都绑定了一个 Flume 收集日志, 这些 Flume 把采集到的日志汇总到一个主 Flume. 主 Flume 最终落地到HDFS

Flume 安装

  1. 下载. https://flume.apache.org/download.html
  2. 解压 tar zxvf apache-flume-{version}-bin.tar.gz
    3.配置 JAVA_HOME
cd apache-flume-1.9.0-bin/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh

首行加入export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
4. 测试版本 ./bin/flume-ng version

Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume 配置

具体如何配置 source, channel, sink, 请参考官网, 这里就一一不拷贝了 https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
下面简单介绍几个Demo

Flume 实战演练

采集 Nginx 日志并打印到控制台

  1. 配置 flume-nginx-console.conf
# 名为 nginx 的 Agent, 配置 source, channel, sink
nginx.sources = source
nginx.channels = channel
nginx.sinks = sink

# 配置source 
# terminal 读取 access.log
nginx.sources.source.type = exec
nginx.sources.source.command = tail -f /var/log/nginx/access.log
nginx.sources.source.channels = channel

# 配置channel
# 类型 内存channel , 最大容量为1000, 最大事务容量为100
nginx.channels.channel.type = memory 
nginx.channels.channel.capacity = 1000
nginx.channels.channel.transactionCapacity = 100

# 配置sink 
#输入 channel, 输出类型为logger输出
nginx.sinks.sink.channel = channel
nginx.sinks.sink.type = logger

  1. 启动 flume-ng
    bin/flume-ng agent --conf conf --conf-file conf/flume-nginx-console.conf --name nginx -Dflume.root.logger=INFO,console

  2. 另开一个终端, 执行curl localhost, 查看 flume 是否打印access日志

采集 Nginx 日志并发送到kafka

  1. 配置 flume-nginx-kafka.conf
# 名为 nginx 的 Agent, 配置 source, channel, sink
nginx.sources = source
nginx.channels = channel
nginx.sinks = sink

# 配置source 
# terminal 读取 access.log
nginx.sources.source.type = exec
nginx.sources.source.command = tail -f /var/log/nginx/access.log
nginx.sources.source.channels = channel

# 配置channel
# 类型 内存channel , 最大容量为1000, 最大事务容量为100
nginx.channels.channel.type = memory 
nginx.channels.channel.capacity = 1000
nginx.channels.channel.transactionCapacity = 100

# 配置sink 
#输入 channel, 输出类型为kafka
nginx.sinks.sink.channel = channel
nginx.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
nginx.sinks.sink.kafka.topic = nginx
nginx.sinks.sink.kafka.bootstrap.servers = localhost:9092
nginx.sinks.sink.kafka.flumeBatchSize = 20
nginx.sinks.sink.kafka.producer.acks = 1
nginx.sinks.sink.kafka.producer.linger.ms = 1
nginx.sinks.sink.kafka.producer.compression.type = snappy
  1. 启动 flume-ng
    bin/flume-ng agent --conf conf --conf-file conf/flume-nginx-console.conf --name nginx -Dflume.root.logger=INFO,console
  2. 另开一个终端, 执行curl localhost, 查看 kafka 是否收集到access日志

评论栏