数据源

链接:https://pan.baidu.com/s/1TtBQpQUNBebqxrrx9czxqQ 
提取码:fmw6

源码在github:https://github.com/lidonglin-bit/Spark-Core

一.页面单跳转化率统计

需求简介

  • 计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率
  • 比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率.
    在这里插入图片描述
  • 产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
  • 数据分析师,可以此数据做更深一步的计算和分析。
  • 企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。
  • 在该模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应得 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。
    需要注意的一点是,页面的访问时有先后的,要做好排序。

思路分析

  1. 读取到规定的页面
  2. 过滤出来规定页面的日志记录, 并统计出来每个页面的访问次数 countByKey 是行动算子 reduceByKey 是转换算子
  3. 明确哪些页面需要计算跳转次数 1-2, 2-3, 3-4 …
  4. 按照 session 统计所有页面的跳转次数, 并且需要按照时间升序来排序
  5. 按照 session 分组, 然后并对每组内的 UserVisitAction 进行排序
  6. 转换访问流水
  7. 过滤出来和统计目标一致的跳转
  8. 统计跳转次数
  9. 计算跳转率
    在这里插入图片描述

二.具体实现

具体业务实现

import java.text.DecimalFormat

import bean.UserVisitAction
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object PageConversion {
  def statPageConversionRate(sc:SparkContext,
                             UserVisitActionRDD: RDD[UserVisitAction],
                             pageString:String): Unit ={

    //1.做出来目标跳转流
    val pages = pageString.split(",")
    val prePages = pages.take(pages.length-1)
    val postPages = pages.takeRight(pages.length-1)
    //结果为List(1->2, 2->3, 3->4, 4->5, 5->6, 6->7)
    val targetPageFlows = prePages.zip(postPages).map {
      case (pre, post) => s"$pre->$post"
    }
    //1.1把targetpages做广播变量,优化性能
    val targetPageFlowBC = sc.broadcast(targetPageFlows)
    //2.计算分母,计算需要页面的点击量 Map(5 -> 3563, 1 -> 3640, 6 -> 3593, 2 -> 3559, 3 -> 3672, 4 -> 3602)
    val pageAndCount = UserVisitActionRDD.filter(action => {
      prePages.contains(action.page_id.toString)
    })
      .map(action => (action.page_id, 1))
      .countByKey()

    //3.计算分子
    //3.1 按照sessionId分组,不能先对需要的页面做过滤,否则会应用调整的逻辑
    val sessionGrouped: RDD[(String, Iterable[UserVisitAction])] = UserVisitActionRDD.groupBy(_.session_id)
    var pageFlowsRDD = sessionGrouped.flatMap {
      case (sid, actionit) =>
        //把每个session的行为做一个时间排序
        val actions: List[UserVisitAction] = actionit.toList.sortBy(_.action_time)
        val preActions = actions.take(actions.length - 1)
        val postActions = actions.takeRight(actions.length - 1)

        preActions.zip(postActions).map {
          case (preAction, postAction) => s"${preAction.page_id}->${postAction.page_id}"
        }.filter(flow => targetPageFlowBC.value.contains(flow)) //使用广播变量
    }

    //3.2聚合
    val pageFlowAndCount: collection.Map[String, Long] = pageFlowsRDD.map((_, 1)).countByKey()

    val f = new DecimalFormat(".00%")
    //4.计算跳转率
    val result: collection.Map[String, Any] = pageFlowAndCount.map {
          //pageAndCount分母
          //1->2  count/1的点击量
          case (flow, count) =>
            val rate = count.toDouble / pageAndCount(flow.split("->")(0).toLong)
            (flow,f.format(rate).toString)
        }
    println(result)

  }
/*
1,2,4,5,4,7  计算他们的跳转率
1.想办法做出来跳转流
      1->2,  2->3   3->4
2.计算跳转率
     1->2跳转率
     分子
         1->2跳转流的个数
            如何计算?
                1.保证是同一个session才能计算,其实就是按照session进行分组

                2.按照时间排序

                3.RDD["1->2","1->2"."2->3"]  map() reduceByKey
                  RDD[UserVisitAction]  map
                  RDD[1,2,3,4,5,6,7]
                  如果做跳转流
                  rdd1= RDD[1,2,3,4,5,6]
                  rdd2= RDD[2,3,4,5,6,7]
                  rdd3 = rdd1.zip(zip).map(...)
                  过滤出来目标跳转流,然后再聚合

     分母
        页面:1.的点击数
 */

完整项目代码

import bean.UserVisitAction
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("project")
    val sc = new SparkContext(conf)

    //从数据把文件读出
    val sourceRDD = sc.textFile("D:\\idea\\spark-knight1\\input\\user_visit_action.txt")

    //把数据封装号(封装到样例类中)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
      val fields = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })

    //需求3
  PageConversion.statPageConversionRate(sc,userVisitActionRDD,"1,2,3,4,5,6,7")
    //关闭项目(sc)
    sc.stop()
  }

}

如果对DecimalFormat不了解,可以看这篇文章
https://blog.csdn.net/qq_46548855/article/details/108190085

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐