MapReduce实践编程

实验一

问题描述

根据用户手机上网的行为记录,基于 MapReduce编程模型设计程序统计不同手机号的用户使用的总流量。其中,数据记录的字段描述如下。

序号字段字段类型描述
0reportTimelong记录报告时间戳
1msisdnString手机号码
2apmacStringAP mac
3acmacStringAC mac
4hostString访问的网址
5siteTypeString网址种类
6upPackNumlong上行数据包数,单位:个
7downPackNumlong下行数据包数,单位:个
8upPayLoadlong上行总流量,要注意单位的转换:byte
9downPayLoadlong下行总流量。要注意单位的转换:byte
10httpStatusStringHTTP Response

数据文件具体内容如下:

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 iface.qiyi.co 视频网站 24 27 2481 24681 200

1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 s19.cnzz.com 站点统计 4 0 264 0 200

1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 2 4 132 1512 200

1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 4 0 240 0 200

1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200

1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sug.so.360.cn 信息安全 18 15 1116 954 200

1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200

1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 input.shouji.sogou.com 搜索引擎 4 0 240 0 200

1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200

1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200

1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200

1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 15 9 918 4938 200

1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 sug.so.360.cn 信息安全 3 3 180 180 200

1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200

1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 站点统计 12 12 3008 3720 200

1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200

1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200

1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200

1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 t3.baidu.com 搜索引擎 2 2 120 120 200

1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 t3.baidu.com 搜索引擎 6 3 360 180 200

1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 18 138 1080 186852 200

实验过程

我们需要从数据中统计出每个用户的所有请求的使用的总流量,即统计用户所有请求的上行流量(索引为8)、下行流量(索引为9)之和。得到结果后输出到单独的文件中。

一、MapReduce程序编写

1、创建maven项目

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2、导入hadoop依赖包

在pom.xml中添加以下依赖:

   <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>

如图所示:
在这里插入图片描述
导入后记得ctrl+s保存一下。

3、创建类
3.1 Flow类

在这里插入图片描述
在这里插入图片描述

在Flow类中写入如下代码:

package com.njupt.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{

    private String phone;     //手机号
    private long  up;       //上行流量
    private long down;     //下线流量

    private long sum;     //总流量

    //无参构造函数
    public Flow() {
    }

    //有参构造函数
    public Flow(String phone, long up, long down) {
        super();
        this.phone = phone;
        this.up = up;
        this.down = down;
        this.sum=this.up+this.down;
    }

    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(this.phone);
        out.writeLong(this.up);
        out.writeLong(this.down);
        out.writeLong(this.sum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {


        this.phone=in.readUTF();
        this.up=in.readLong();
        this.down=in.readLong();
        this.sum=in.readLong();

    }

    @Override
    public String toString() {

        return   this.up+"\t"+this.down+"\t"+this.sum;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public long getUp() {
        return up;
    }

    public void setUp(long up) {
        this.up = up;
    }

    public long getDown() {
        return down;
    }

    public void setDown(long down) {
        this.down = down;
    }
    public long getSum() {
        return sum;
    }
}

如图所示:
在这里插入图片描述
在这里插入图片描述
在FlowSumMapper类中输入以下代码:

package com.njupt.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang3.StringUtils;

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, Flow>{

    @Override
    protected void map(LongWritable key, Text value,
                       Context context)
            throws IOException, InterruptedException {
        //拿一行数据
        String line = value.toString();
        //切分成各个字段
        String[] fields = StringUtils.split(line, "\t");

        //拿到我们需要的字段
        String phone = fields[1];
        long  up= Long.parseLong(fields[8]);
        long  down = Long.parseLong(fields[9]);
        //封装数据为kv并输出        <phone:flow>
        context.write(new Text(phone), new Flow(phone,up,down));
    }
}

如图所示:
在这里插入图片描述

3.3 FlowSumReducer类

创建FlowSumReducer类

在这里插入图片描述
在FlowSumReducer中输入以下代码:

package com.njupt.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, Flow, Text, Flow> {

    @Override
    protected void reduce(Text key, Iterable<Flow> values,
                          Context context)
            throws IOException, InterruptedException {
        //  <phone:{flow,flow,flow,flow}>
        // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
        long up = 0;//
        long down = 0;
        for (Flow flow : values) {
            up += flow.getUp();
            down += flow.getDown();
        }
        context.write(key, new Flow(key.toString(), up, down));

    }

}


如图所示:
在这里插入图片描述

3.4 FlowSumRunner类

创建FlowSumRunner类

在这里插入图片描述
在FlowSumRunner类中输入以下代码:

package com.njupt.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowSumRunner extends Configured implements Tool{


    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowSumRunner.class);

        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);

        //设置map程序的输出key、value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Flow.class);

        //设置   输出 key、value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flow.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径     /flow/input

        //检查一下参数所指定的输出路径是否存在,如果已存在,先删除
        Path output = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output)){
            fs.delete(output, true);
        }

        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径   /flow/output

        return job.waitForCompletion(true)?0:1;
    }


    public static void main(String[] args) throws Exception {
        int  status = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
        System.exit(status);
    }
}

如图所示:
在这里插入图片描述

4、将项目打包成jar包

查看porn.xml中是否包含这行代码:

<packaging>jar</packaging>

如图所示:
在这里插入图片描述
将flowsum项目打包:
在这里插入图片描述
在这里插入图片描述
在终端出现build success即可,如图所示:
在这里插入图片描述
对flowsum进行刷新
在这里插入图片描述
此时target文件夹中就会出现打包好的jar包。
在这里插入图片描述
将该jar包改名成fs.jar,放入一个你可以找到的文件夹中。
在这里插入图片描述

二、提交到hadoop集群测试

1、 创建数据

创建一个记事本文件,命名为“data.txt”。

将数据复制到该文件中并保存,如图所示:

在这里插入图片描述

2、启动hadoop集群
2.1 启动虚拟机node1、node2、node3

在这里插入图片描述

2.2 启动hadoop

输入以下命令:

start-dfs.sh
start-yarn.sh

如图所示:

在这里插入图片描述

2.3 在浏览器打开hadoop界面

http://192.168.198.130:9870 (其中192.168.198.130换成自己的node1的IP地址)

在这里插入图片描述

查看文件夹:

在这里插入图片描述

3、测试MapReduce
3.1 用Xftp上传数据文件和jar包

在这里插入图片描述

3.2 node1上cd到/export/server/hadoop-3.3.0/share/hadoop/mapreduce目录
cd /export/server/hadoop-3.3.0/share/hadoop/mapreduce
3.3 输入以下命令在hdfs上创建/flow/input文件夹
hdfs dfs -mkdir -p /flow/input

如图所示:

在这里插入图片描述

可以在浏览器查看创建情况

在这里插入图片描述

3.4 输入以下命令,将data.txt上传到input文件夹中
hdfs dfs -put data.txt /flow/input

如图所示:

在这里插入图片描述

可以在浏览器查看上传情况:

在这里插入图片描述

3.5 输入以下命令运行fs.jar统计不同手机号的用户使用的总流量
hadoop jar fs.jar com.njupt.flowsum.FlowSumRunner /flow/input /flow/output

如图所示:

在这里插入图片描述

3.6 可以输入以下命令查看输出结果
hdfs dfs -text /flow/output/part-r-00000

如图所示:

在这里插入图片描述
image-20231020202354304.png&pos_id=img-ha3JxHz7-1698633023279)

也可以在浏览器下载结果文件:

在这里插入图片描述
20231020202416090.png&pos_id=img-2pQkSVHA-1698633023279)

在这里插入图片描述
在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐