Flink中的keyBy操作是用于对数据进行分区的一种操作,‌它通过指定一个或多个键(‌key)‌将数据流逻辑上划分为不同的分区,‌以便进行并行处理。‌

在Flink中,‌当我们需要对海量数据进行聚合处理时,‌通常会先进行分区,‌以提高处理效率。‌keyBy操作就是实现这一目的的关键步骤。‌通过keyBy操作,‌我们可以根据指定的键将数据流划分为不同的分区,‌每个分区内的数据将发送到同一个分区进行处理。‌这种分区的方式是通过计算键的哈希值,‌并通过对分区数取模运算来实现的。‌因此,‌具有相同键的数据会被发送到同一个分区进行处理,‌从而实现数据的并行聚合。‌

在指定键的方式上,‌Flink提供了多种灵活性。‌例如,‌对于Tuple数据类型,‌可以指定字段的位置或多个位置的组合作为键;‌对于POJO类型,‌可以指定字段的名称;‌此外,‌还可以使用Lambda表达式或实现KeySelector接口来定义从数据中提取键的逻辑。‌这种灵活性使得keyBy操作能够适应各种复杂的数据处理场景。‌

此外,‌窗口操作是Flink中另一个重要的概念,‌用于处理无界数据流。‌窗口操作可以将连续的数据流切割成有限大小的多个“存储桶”,‌每个数据都会被分发到对应的桶中。‌当达到窗口结束时间时,‌对每个桶中收集的数据进行计算处理。‌这种机制使得Flink能够高效地处理实时数据流,‌实现数据的聚合和统计。‌

综上所述,‌Flink中的keyBy操作是数据处理中不可或缺的一步,‌它通过指定键对数据进行分区,‌以实现并行处理和聚合操作的高效执行。‌同时,‌结合窗口操作,‌Flink能够更好地处理实时数据流,‌满足各种数据处理需求

对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。keyBy会将一个DataStream转化为一个KeyedStream

绝大多数情况,我们要根据事件的某种属性或数据的某个字段进行分组,对一个分组内的数据进行处理。如下图所示,keyBy算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理。比如,多支股票数据流处理时,可以根据股票代号进行分组,然后对同一股票代号的数据统计其价格变动。又如,电商用户行为日志把所有用户的行为都记录了下来,如果要分析某一个用户行为,需要先按用户ID进行分组

keyBy算子将DataStream转换成一个KeyedStreamKeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStreamDataStream的各元素随机分布在各Task Slot中,KeyedStream的各元素按照Key分组,分配到各Task Slot中。我们需要向keyBy算子传递一个参数,以告知Flink以什么字段作为Key进行分组

DataStream<Tuple2<String, Integer>> dataStream = ...;
// 使用KeyBy算子将数据流分区
DataStream<Tuple2<String, Integer>> keyedStream = dataStream.keyBy(0); // 根据第一个元素作为Key

比如如下需求假设我们有一份订单数据流,包含订单ID、用户ID和订单金额三个字段,我们希望根据用户ID将数据流分区,并对每个用户的订单金额求和。代码如下所示:

// 定义订单数据类
public class Order {
    public String orderId;
    public String userId;
    public double amount;
    
    public Order(String orderId, String userId, double amount) {
        this.orderId = orderId;
        this.userId = userId;
        this.amount = amount;
    }
}
// 生成订单数据流
List<Order> orders = new ArrayList<>();
orders.add(new Order("1", "user1", 200.0));
orders.add(new Order("2", "user2", 200.0));
orders.add(new Order("3", "user1", 300.0));
orders.add(new Order("4", "user2", 400.0));
DataStream<Order> orderStream = env.fromCollection(orders);
// 使用KeyBy算子将数据流按照用户ID分区,并对每个用户的订单金额求和
DataStream<Tuple2<String, Double>> sumStream = orderStream
    .keyBy(order -> order.userId) // 根据用户ID作为Key进行分区
    .sum("amount"); // 对每个分区的订单金额求和
// 输出分区后的结果
sumStream.print();

在上述代码中,我们首先定义了一个Order类,用于表示订单数据。然后生成一份包含4个订单的数据流,并使用KeyBy算子将数据流按照用户ID分区。接着,我们调用sum算子对每个分区的订单金额求和,并将计算结果打印输出。 运行代码后,我们可以得到如下的输出结果:

(user1, 500.0)
(user2, 600.0)

从输出结果可以看出,KeyBy算子将订单数据流按照用户ID分成了两个分区,分别对应用户user1和user2,而sum算子则对每个分区的订单金额进行了求和。这样,我们就成功地完成了基于Key的聚合操作。 需要注意的是,KeyBy算子只能将数据流按照指定的Key进行分区,而无法对分区进行任何修改。如果需要对分区进行修改或者调整,可以使用其他分区算子,例如Shuffle、Rebalance等。

实例1:要求按照名称分组,相同名称的放在一个分区里,有数据如下:

abc,1
jek,2
abc,3

代码如下:

package flink.transform.keyby;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**abc,1
 * jek,2
 * abc,3
 * 分组,第一个逗号前面的单词相同的分在一个线程里,以姓名为key进行分组
 *
 */
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
        KeyedStream<String,String> keyedStream =numbersString.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String s) throws Exception {
                return s.split(",")[0];
            }
        });
        keyedStream.print();
        env.execute("Flink Filter Example");
    }
}

实例2:要求按照名称分组,并分装在person类里,数据如下

tom,23
rose,28
tom,25
package flink.transform.keyby;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * tom,23
 * rose,28
 * tom,25
 * 通过MAP操作封装成JAVAbean ,映射变换
 * keyby分组
 */
public class Test1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
        DataStream<Person> ss = numbersString.map(new MapFunction<String, Person>() {
            @Override
            public Person map(String s) throws Exception {
                String[] array = s.split(",");
                Person person = new Person();
                person.setName(array[0]);
                person.setSex(Integer.parseInt(array[1]));
                return person;
            }
        });
        KeyedStream<Person,String> sss = ss.keyBy(new KeySelector<Person, String>() {
            @Override
            public String getKey(Person person) throws Exception {
                return person.getName();
            }
        });
        sss.print();
        env.execute("Flink Filter Example");
    }
}

实例3,要求按照名称分组,组装成tupple元组,需要熟悉Tupple2的用法

数据如下:

tom,23
rose,28
tom,25
代码如下:
package flink.transform.keyby;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * tom,23
 * rose,28
 * tom,25
 * 通过MAP操作映射变换成元组
 *
 */
public class Test2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
        DataStream<Tuple2<String, Integer>> ss = numbersString.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] array = s.split(",");

                return new Tuple2<>(array[0],Integer.parseInt(array[1]));
            }
        });
        KeyedStream<Tuple2<String,Integer>,String> sss = ss.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String,Integer> tp) throws Exception {
                return tp.f0 ;
            }
        });
        sss.print();
        env.execute("Flink Filter Example");
    }
}

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐