一、项目说明

1、需求

实时更新每个用户走的总步数;

每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;

这里为了方便只将每个用户以及实时更新的步数总和两个维度保存到redis数据库中;

2、业务流程

首先造一些模拟数据实时传入kafka队列,然后sparkStreaming从kafka实时读取这些模拟数据并做相关分析,最终将分析结果存入redis;

3、大数据组件

kafka: kafka_2.10-0.10.2.1

spark: spark-2.2.0-bin-hadoop2.7

redis: redis-3.0.0

4、编译工具

IDEA: IntelliJ IDEA 2018.3.2 x64

二、实战代码

1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cn</groupId>
    <artifactId>sparkSysLearn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency><!-- Spark Streaming Kafka -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.gavaghan</groupId>
            <artifactId>geodesy</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_2.11</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.codehaus.jettison/jettison -->
        <dependency>
            <groupId>org.codehaus.jettison</groupId>
            <artifactId>jettison</artifactId>
            <version>1.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib -->
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>telecomeAnalysis-1.0.0</finalName>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.12.4</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <!-- <descriptor>src/main/resources/assembly.xml</descriptor> -->
                    <appendAssemblyId>false</appendAssemblyId>
                </configuration>
            </plugin>
            <!-- 拷贝依赖的jar包到lib目录 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2、模拟数据

package com.cn.util

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.codehaus.jettison.json.JSONObject

import scala.util.Random


/**
  * 编写一个提交数据到kafka集群的producer
  * 模拟场景:
  * 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;
  */
object KafkaEventProducer {

  //用户
  private val users = Array(
    "zhangSan", "liSi",
    "wangWu", "xiaoQiang",
    "zhangFei", "liuBei",
    "guanYu", "maChao",
    "caoCao", "guanYu"
  )

  private var pointer = -1

  //随机获得用户
  def getUser(): String = {
    pointer = (pointer + 1) % users.length
    users(pointer)
  }

  //获取新增步数
  val random = new Random()

  def getNewStepNum(): Int = {
    random.nextInt(users.length)
  }

  //获取统计时间
  def getTime(): Long = {
    System.currentTimeMillis()
  }

  //获取行走地点
  val walkPlace = Array(
    "操场南门", "操场东门", "操场北门", "操场西门", "操场东南门", "操场西北门", "操场西南门", "操场东南北门"
  )

  def getWalkPlace(): String = {
    walkPlace(random.nextInt(walkPlace.length))
  }


  def main(args: Array[String]): Unit = {

    val topic = "topic_walkCount"
    val brokers = "master:6667,slaves1:6667,slaves2:6667"
    //设置属性,配置
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokers)
    props.setProperty("metadata.broker.list", brokers)
    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //生成producer对象
    val producer = new KafkaProducer[String, String](props)

    //传输数据
    while (true) {
      val event = new JSONObject()
      event.put("user", getUser())
        .put("count_time", getTime())
        .put("walk_place", getWalkPlace())
        .put("new_walkNum", getNewStepNum())
      println(event.toString())
      //发送数据
      producer.send(new ProducerRecord[String, String](topic, event.toString))
      Thread.sleep(5000)
    }
  }
}

3、redis工具类

package com.cn.util

import redis.clients.jedis.JedisPool

object RedisUtils {

  private val host = "master"
  private val port = 6379
  //private val poolConfig = new GenericObjectPoolConfig()
  lazy val pool = new JedisPool(host, port)

  //关闭
  lazy val hooks = new Thread() {
    override def run(): Unit = {
      println("Execute hook thread: " + this)
      pool.destroy()
    }
  }
}

4、业务处理代码

package com.cn.sparkStreaming

import com.cn.util.RedisUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.codehaus.jettison.json.JSONObject

/**
  * 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;
  * 将每个用户以及实时更新的步数总和保存到redis数据库中;
  */
object kafka2sparkStreaming2redis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka2sparkStreaming2redis")
      .setMaster("local[1]")
    //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //设置流数据每批的时间间隔为2s
    val ssc = new StreamingContext(conf, Seconds(2))
    //控制日志输出级别
    ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUG
    ssc.checkpoint("checkpoint")
    val topic = "topic_walkCount"
    val groupId = "t03"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master:6667,slaves1:6667,slaves2:6667",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false
    )

    val topics = Array(topic)
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //均匀分发到executor
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    val dbIndex = 3;
    stream.foreachRDD(rdd => {
      // 获取每一个分区的消费的偏移量
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(partitions => {
        partitions.foreach(records => {
          val record = new JSONObject(records.value())
          val user = record.getString("user")
          val countTime = record.getLong("count_time")
          val walkPlace = record.getString("walk_place")
          val newWalkNum = record.getInt("new_walkNum")

          //获取redis对象
          val jedis = RedisUtils.pool.getResource
          //redis密码
          jedis.auth("123456")
          //选择数据库
          jedis.select(dbIndex)
          val count = jedis.hincrBy("user_walknum", user, newWalkNum)
          println(count)
          RedisUtils.pool.returnResource(jedis)
        })
      })
      // 手动提交偏移量
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

三、运行及结果展示

1、模拟数据展示

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"user":"zhangSan","count_time":1582689943955,"walk_place":"操场北门","new_walkNum":5}
{"user":"liSi","count_time":1582689956236,"walk_place":"操场东门","new_walkNum":0}
{"user":"wangWu","count_time":1582689961236,"walk_place":"操场东南北门","new_walkNum":2}
{"user":"xiaoQiang","count_time":1582689966239,"walk_place":"操场东门","new_walkNum":6}
{"user":"zhangFei","count_time":1582689971240,"walk_place":"操场东门","new_walkNum":8}
{"user":"liuBei","count_time":1582689976240,"walk_place":"操场西南门","new_walkNum":5}
{"user":"guanYu","count_time":1582689981240,"walk_place":"操场东南门","new_walkNum":9}
{"user":"maChao","count_time":1582689986240,"walk_place":"操场北门","new_walkNum":6}
{"user":"caoCao","count_time":1582689991245,"walk_place":"操场东南北门","new_walkNum":2}
{"user":"guanYu","count_time":1582689996245,"walk_place":"操场西门","new_walkNum":0}
{"user":"zhangSan","count_time":1582690001246,"walk_place":"操场东门","new_walkNum":3}
{"user":"liSi","count_time":1582690006247,"walk_place":"操场北门","new_walkNum":2}

2、redis存储展示

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐