UVStatMultiPlans(GitHub)项目持续收集各种高性能实时uv实现方案并对各种实现方案的优缺点进行对比分析!

需求描述

统计每分钟用户每个页面的uv访问量。

代码实现
完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git
public class UVStatPlan1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(TimeUnit.MINUTES.toMillis(10));
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", SysConst.KAFKA_BOOTSTRAP_SERVERS);
        kafkaProperties.setProperty("group.id","groupId_" + System.currentTimeMillis());
        kafkaProperties.setProperty("auto.offset.reset","latest");
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(SysConst.KAFKA_TOPIC_NAME, new SimpleStringSchema(), kafkaProperties);
        DataStream<UserBehavior> dataStream = env.addSource(consumer).map(x -> {
            UserBehavior userBehavior = null;
            try{
                userBehavior = JsonUtil.toJavaObject(x,UserBehavior.class);
            }catch (Exception ex){
                ex.printStackTrace();
            }
            return userBehavior;
        }).assignTimestampsAndWatermarks
                (WatermarkStrategy.<UserBehavior>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<UserBehavior>)
                        (userBehavior, l) -> userBehavior.getBehaviorTime()));
        dataStream.keyBy((KeySelector<UserBehavior, String>) UserBehavior::getPage).window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(new TimeIntervalTrigger<>(5,TimeUnit.SECONDS))
                .aggregate(new UVStatAggregate(),new WindowResultFunction())
                .map(x -> {
                    System.out.println("key:" + x.page + ",window time:" + DateUtil.formatTimeStamp(x.windowTime,"yyyy-MM-dd HH:mm:ss") + ",uv:" + x.uv);
                    return null;
                });
        env.execute();
    }

    public static class WindowResultFunction implements WindowFunction<Integer, PageUVResult, String, TimeWindow> {

        @Override
        public void apply(
                String key,
                TimeWindow window,
                Iterable<Integer> aggregateResult,
                Collector<PageUVResult> collector
        ) throws Exception {
            Integer count = aggregateResult.iterator().next();
            collector.collect(PageUVResult.of(key, window.getEnd(), count));
        }
    }

    public static class UVStatAggregate implements AggregateFunction<UserBehavior, Set<String>, Integer> {

        @Override
        public Set<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public Set<String> add(UserBehavior userBehavior, Set<String> accumulator) {
            accumulator.add(userBehavior.getUserId());
            return accumulator;
        }

        @Override
        public Integer getResult(Set<String> accumulator) {
            return accumulator.size();
        }

        @Override
        public Set<String> merge(Set<String> a, Set<String> b) {
            a.addAll(b);
            return a;
        }
    }
}
本实现方式优缺点
  • 优点:
    实现方案较为简单。
  • 缺点:
    基于Set实现,用户数据存储在内存中,如果统计周期内用户量较大需要耗费较大的内存空间。
完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐