Flume安装部署以及简单案例入门
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。若遇到文件定期更改文件名,并且重新创建一个新原始文件的名字的文件,监控到并上传的数据将是累积的文件内容,并不是更新的内容数据,导致数据重复。修改源码中更新和读取的操作,然后将修改好的文
目录
1)在job目录下创建文件:file-flume-hdfs.conf
3)开启Hadoop 和 Hive 并操作 Hive 产生日志
3.1、需求:使用Flume监听整个目录的文件,并上传到HDFS
3.3.1、在job目录下创建配置文件dir-flume-hdfs.conf,并添加以下内容
3.3.3、向 upload 文件夹中添加文件,在/opt/module/flume-1.9.0/ 目录下创建 upload 文件夹,并添加以下文件
4.1、需求:使用Flume监听整个目录的实时追加文件,并上传到HDFS
4.3.1、在job目录下创建配置文件taildir-flume-hdfs.conf,并添加以下内容
4.3.3、向files文件加追加内容,在/opt/module/flume-1.9.0目录下创建files和files2文件夹,向该文件夹中添加文件
一、Flume安装部署
1、Flume 相关地址
(1) Flume 官网地址http://flume.apache.org/
(2)文档查看地址http://flume.apache.org/FlumeUserGuide.html
(3)下载地址http://archive.apache.org/dist/flume/
2、安装部署
(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下。
(2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下。
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0
(4) 将 flume-1.9.0/conf 下 的 flume-env.sh.template 文 件 修 改 为 flume-env.sh , 并配 置flume-env.sh 文件:
mv flume-env.sh.template flume-env.sh
vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_271
(5) 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[kgf@hadoop104 flume-1.9.0]$ rm -rf /opt/module/flume-1.9.0/lib/guava-11.0.2.jar
二、Flume入门案例
1、监控端口数据官方案例
1.1、需求:
使用Flume监听一个端口,收集该端口数据,并打印到控制台。
1.2、分析
通过netcat工具向本机的44444端口发送数据–>Flume监控本机的44444端口,通过Flume的source端读取数据–>Flume把获取的数据通过SInk端写出到控制台。
1.3、步骤如下
1.3.1、安装使用netcat
//下载netcat
sudo yum install -y nc
//开启服务端端口
nc -lk 9999
//开启服务端进行通信
nc localhost 9999
1.3.2、判断44444端口是否被占用
sudo netstat -nlp | grep 44444
1.3.3、在flume-1.9.0下创建job文件夹,在job下创建Flume Agent配置文件netcat-flume-logger.conf(表示数据源-经过flume-到sink),添加以下内容
# Name the components on this agent
#组件声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port =44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.3.4、开启flume监听端口
//方式一
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
//方式二
bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数说明:
- (i)–conf/-c:配置文件在conf/目录。
- (ii)–name/-n:表示给agent起名为a1。
- (iii)–conf-file/-f:flume本次启动读取的配置文件在job目录下的netcat-flume-logger.conf文件。
- (iv)-Dflume.root.logger-INFO,console:-D 表示 flume 运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。
1.3.5、使用netcat向本机的44444端口发送内容
nc localhost 44444
hello
1.3.6、在Flume监听页面观察接受数据情况
2、实时监控单个追加文件
2.1、需求:
实时监控Hive日志,并上传到HDFS中。
2.2、分析
2.3、步骤
1)在job目录下创建文件:file-flume-hdfs.conf
要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#定义source类型为exec可执行命令的
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round= true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件(单位s)
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channelwhich buffers eventsin memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2)运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-flume-hdfs.conf
//或用
bin/flume-ng agent -n a2 -c conf/ -f job/file-flume-hdfs.conf
3)开启Hadoop 和 Hive 并操作 Hive 产生日志
Exec source 适用于监控一个实时追加的文件。
缺点:tail只能传最后10条数据,不支持断点续传
3、实时监控目录下多个新文件
3.1、需求:使用Flume监听整个目录的文件,并上传到HDFS
3.2、分析
3.3、步骤
3.3.1、在job目录下创建配置文件dir-flume-hdfs.conf,并添加以下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
#定义source类型为目录
a3.sources.r3.type = spooldir
#监控目录
a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
#定义文件上传完的后缀
a3.sources.r3.fileSuffix = .COMPLETED
#是否有文件头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type= hdfs
a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round= true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize= 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
3.3.2、启动监控文件命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-flume-hdfs.conf
在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。
3.3.3、向 upload 文件夹中添加文件,在/opt/module/flume-1.9.0/ 目录下创建 upload 文件夹,并添加以下文件
touch test.txt
touch text.tmp
touch test.log
Spooldir Source适合用于同步新文件。
缺点:不适合对实时追加日志的文件进行监听并同步
4、实时监控目录下的多个追加文件
Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
Taildir 说明:
Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
Position File 的格式如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume-1.9.0/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume-1.9.0/files/file2.txt"}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
4.1、需求:使用Flume监听整个目录的实时追加文件,并上传到HDFS
4.2、分析
4.3、步骤
4.3.1、在job目录下创建配置文件taildir-flume-hdfs.conf,并添加以下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
#定义source类型
a3.sources.r3.type = TAILDIR
#指定position——file的位置
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
#定义监控目录文件
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type= hdfs
a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize= 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval= 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channelwhich buffers eventsin memory
a3.channels.c3.type= memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the sourceand sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
4.3.2、启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf
4.3.3、向files文件加追加内容,在/opt/module/flume-1.9.0目录下创建files和files2文件夹,向该文件夹中添加文件
#files文件夹下
touch file1.txt
touch file2.txt
echo hello >> file1.txt
echo lyx >> file2.txt#files2文件夹下
touch file3.log
echo hi >> file3.log
4.3.4、查看HDFS上的数据
上述存在一个问题:
若遇到文件定期更改文件名,并且重新创建一个新原始文件的名字的文件,监控到并上传的数据将是累积的文件内容,并不是更新的内容数据,导致数据重复。
解决:
修改源码中更新和读取的操作,然后将修改好的文件打包下载到本地,再上传到lib目录下进行源码替换,重新启动监控文件夹命令
(1)更新操作:只看inode的值。
(2)读取操作:只要文件是新文件,只看inode的值,不看绝对路径。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)