实验目的 :

  1. 通过实验学习日志采集工具Flume 的安装和使用方法;
  2. 掌握采用Flume 作为Spark Streaming 数据源的编程方法。

一、Flume的安装与配置

# 1.创建文件夹
mkdir -p /usr/flume
# 2.解压文件
tar -zvxf apache-flume-1.7.0-bin.tar.gz -C /usr/flume/
# 3.重命名
cd /usr/flume/
mv ./apache-flume-1.7.0-bin/ ./flume-1.7.0

1.2 配置环境变量

vi /etc/profile
export FLUME_HOME=/usr/flume/flume-1.7.0
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
source /etc/profile

在这里插入图片描述

1.3 配置 flume-env.sh

# 在conf目录下
cd /usr/flume/flume-1.7.0/conf
# 复制为flume-env.sh
cp ./flume-env.sh.template ./flume-env.sh

# vi flume-env.sh
# 添加java路径
export JAVA_HOME=/usr/java/jdk1.8.0_171

在这里插入图片描述

二、使用Avro数据源测试 Flume

Avro 可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC 机制。请对Flume 的相关配置文件进行设置, 从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume 以后, 可以把helloworld.txt 中的文本内容显示出来。

2.1 创建 avro.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述

2.2 创建 helloworld.txtz

在此目录下 /usr/flume/flume-1.7.0

vi helloworld.txt
Hello World
Hello Flume

在这里插入图片描述

2.3 启动 agent

# 在此目录下 /usr/flume/flume-1.7.0
flume-ng agent -c /usr/flume/flume-1.7.0/conf -f  /usr/flume/flume-1.7.0/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

# -c:使用配置文件所在目录(这里指默认路径,即$FLUME_HOME/conf)
# -f:flume定义组件的配置文件 
# -n:启动Agent的名称,该名称在组件配置文件中定义 
# -Dflume.root.logger:flume自身运行状态的日志,按需配置,详细信息,控制台打印

在这里插入图片描述

在同目录下新建一个命令终端,使用avro-client发送文件

./bin/flume-ng avro-client -H localhost -p 4141 -F ./helloworld.txt

在这里插入图片描述

三、使用netcat 数据源测试Flume

请对Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个Linux 终端(这里称为“Flume 终端”)中,启动Flume,在另一个终端(这里称为“Telnet 终端”)中, 输入命令“telnet localhost 44444”,然后,在Telnet 终端中输入任何字符,让这些字符可以顺利地在Flume 终端中显示出来。

3.1 创建 netcat.conf

cd /usr/flume/flume-1.7.0
vi ./conf/netcat.conf
# netcat.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述
/usr/flume/flume-1.7.0 目录下 启动 flume agent 出现如下:

./bin/flume-ng agent --conf ./conf --conf-file ./conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console

在这里插入图片描述
启动Flume,在另一个终端(这里称为“Telnet 终端”)中, 输入命令“telnet localhost 44444”

当输出错误:bash: telnet: command not found…
在这里插入图片描述
说明容器中没有telnet,需要下载:

yum list telnet*      	# 列出telnet相关的安装包
yum install telnet-server  # 安装telnet服务
yum install telnet.*  	# 安装telnet客户端

在这里插入图片描述
再输入:telnet localhost 44444
在这里插入图片描述
在第二个终端输入英文或字符,回车。在第一个终端会有显示。
在这里插入图片描述

四、使用Flume 作为Spark Streaming 数据源

Flume 是非常流行的日志采集系统,可以作为Spark Streaming 的高级数据源。请把Flume Source 设置为netcat 类型,从终端上不断给Flume Source 发送各种消息,Flume 把消息汇集到Sink,这里把Sink 类型设置为avro,由Sink 把消息推送给Spark Streaming,由自己编写的Spark Streaming 应用程序对消息进行处理。

4.1 配置Flume数据源

在conf目录下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#receive message from port 33333
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 33333

#send message through port 44444
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 44444

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1                      

在这里插入图片描述

  • 在上面的配置文件中,我们把Flume Source类别设置为netcat,绑定到master的33333端口,这样我们后面就可以通过“telnet master 33333”命令向Flume Source发送消息。

  • 同时,我们把Flume Sink类别设置为avro,绑定到localhost的44444端口,这样Flume Source把采集到的消息 (telnet master 33333) 汇集到Flume Sink以后,Sink会把消息推送给master 的44444端口,而我们编写的Spark Streaming程序一直在监听master 的44444端口,一旦有消息到达,就会被Spark Streaming应用程序取走进行处理。

4.2 下载spark-streaming的jar包

我下载的是spark-streaming-flume_2.11-2.4.7.jar,注意要下载对应的版本。下载地址如下:
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11
在这里插入图片描述

4.3 将下载的jar包导入spark的jars目录下

cd /usr/spark/spark-2.4.0-bin-hadoop2.7/jars
mkdir ./flume
# 将flume的lib目录下的jar包,复制到spark的jars下:
cp /usr/flume/flume-1.7.0/lib/* usr/spark/spark-2.4.0-bin-hadoop2.7/jars

注意,在复制过来是,如果有覆盖的,我选择替换原来,因为在复制过后可能会出现多个版本的jar包冲突,需要将冲突的包删去。

进入spark-shell,输入:

import org.apache.spark.streaming.flume._

在这里插入图片描述
表示配置OK!

4.4 IDEA添加依赖(要与自己的spark版本对应)

在之前配置hadoop,spark的基础上配置

	<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.7</version>
        </dependency>

注意: 依赖的版本要与hadoop、scala、spark 等版本相适应。

4.3 启动 Flume

 # 在此目录下 /usr/flume/flume-1.7.0
./bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

由于我安装的版本有点问题运行版本错误,可以参考

爱小齐的玉齐博主的:实验3 Spark Streaming编程初级实践

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐