Flink中的侧输出流SideOutput使用场景
一、SideOutput流作用侧输出流有两个作用:(1)分隔过滤。充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费(2)延时数据处理。在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失二、SideOutput使用示例@Dataclass OrderLog {priv
·
一、SideOutput流作用
Flink侧输出流有两个作用:
(1)分隔过滤。充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费
(2)延时数据处理。在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失
二、SideOutput使用示例
orderLog.tx如下:
{"orderId":"20201011231245423","skuId":"1226354","priceType":"new","requestTime":"1599931359071"}
{"orderId":"20201011231254678","skuId":"1226322","priceType":"normal","requestTime":"1599931359024"}
{"orderId":"20201011231212768","skuId":"1226324","priceType":"back","requestTime":"1599931359011"}
{"orderId":"20201011231234567","skuId":"1226351","priceType":"normal","requestTime":"1599931359073"}
订单流实体OrderLog如下 :
@Data
class OrderLog {
private String orderId;
private String skuId;
private String priceType;
private Long requestTime;
}
(1)分隔过滤作用
将不同价格类型订单从主流中分开处理
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import com.alibaba.fastjson.JSON;
import lombok.Data;
public class TestSideOutputStream {
public static final OutputTag<OrderLog> NEW_PRICE = new OutputTag<>("NEW_PRICE", TypeInformation.of(OrderLog.class));
public static final OutputTag<OrderLog> BACK_FLOW_PRICE = new OutputTag<>("BACK_FLOW_PRICE", TypeInformation.of(OrderLog.class));
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> textDataSteam = env.readTextFile("C:\\Users\\admin\\Desktop\\orderLog.txt");
//不同价格类型的流分开处理,将新人价、回流价类型的输出到侧输出流
SingleOutputStreamOperator<OrderLog> streamOperator = textDataSteam.process(new SideOutPutProcessFunction());
streamOperator.print("主流-正常价");
streamOperator.getSideOutput(NEW_PRICE).print("侧硫-新人价");
streamOperator.getSideOutput(BACK_FLOW_PRICE).print("侧硫-回流价");
env.execute();
}
}
class SideOutPutProcessFunction extends ProcessFunction<String, OrderLog> {
private static final long serialVersionUID = -6632888020403733197L;
@Override
public void processElement(String value, ProcessFunction<String, OrderLog>.Context ctx,
Collector<OrderLog> collect) throws Exception {
OrderLog orderLog = JSON.parseObject(value, OrderLog.class);
if("normal".equals(orderLog.getPriceType())) {
collect.collect(orderLog);
} else if("new".equals(orderLog.getPriceType())) {
ctx.output(TestSideOutputStream.NEW_PRICE, orderLog);
} if("back".equals(orderLog.getPriceType())) {
ctx.output(TestSideOutputStream.BACK_FLOW_PRICE, orderLog);
}
}
}
输出结果
主流-正常价:3> OrderLog(orderId=20201011231234567, skuId=1226351, priceType=normal)
侧硫-回流价:2> OrderLog(orderId=20201011231212768, skuId=1226324, priceType=back)
侧硫-新人价:1> OrderLog(orderId=20201011231245423, skuId=1226354, priceType=new)
主流-正常价:1> OrderLog(orderId=20201011231254678, skuId=1226322, priceType=normal)
(2)窗口延时数据处理
sideOutputLateData输出迟到数据,getSideOutput得到迟到数据继续处理
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import com.alibaba.fastjson.JSON;
import com.jd.ubb.feed.flow.data.compute.trigger.OrderCountSingleTrigger;
import lombok.Data;
public class TestSideOutputStream {
public static final OutputTag<OrderLog> NEW_PRICE = new OutputTag<>("NEW_PRICE", TypeInformation.of(OrderLog.class));
public static final OutputTag<OrderLog> BACK_FLOW_PRICE = new OutputTag<>("BACK_FLOW_PRICE", TypeInformation.of(OrderLog.class));
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> textDataSteam = env.readTextFile("C:\\Users\\admin\\Desktop\\orderLog.txt");
SingleOutputStreamOperator<OrderLog> dayPvDataStream = textDataSteam
.flatMap(new SideOutPutMapFunction())
.assignTimestampsAndWatermarks(new AssignedWaterMarks(Time.seconds(3))) //保证乱序事件处理
.keyBy(OrderLog::getOrderId)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) //按1天开窗口
.trigger(OrderCountSingleTrigger.of(1)) //来一条数据做一次计算
.allowedLateness(Time.minutes(30)) //允许数据迟到30分钟
.sideOutputLateData(TestSideOutputStream.NEW_PRICE)
.process(new SideOutPutWindowProcessFunction());
dayPvDataStream.addSink(new SideOutPutSinkFunction());
dayPvDataStream.getSideOutput(TestSideOutputStream.NEW_PRICE) //得到迟到数据处理
.keyBy(OrderLog::getOrderId)
.window(TumblingEventTimeWindows.of(Time.seconds(3))) //对迟到数据3秒计算一次
.process(new SideOutPutWindowProcessFunction())
.addSink(new SideOutPutSinkFunction());
env.execute();
}
}
/**
* 水位线,保证乱序事件时间处理
*/
class AssignedWaterMarks extends BoundedOutOfOrdernessTimestampExtractor<OrderLog> {
private static final long serialVersionUID = 2021421640499388219L;
public AssignedWaterMarks(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(OrderLog orderLog) {
return orderLog.getRequestTime();
}
}
/**
* map转换输出
*/
class SideOutPutMapFunction extends RichFlatMapFunction<String, OrderLog> {
private static final long serialVersionUID = -6478853684295335571L;
@Override
public void flatMap(String value, Collector<OrderLog> out) throws Exception {
OrderLog orderLog = JSON.parseObject(value, OrderLog.class);
out.collect(orderLog);
}
}
/**
* 窗口函数
*/
class SideOutPutWindowProcessFunction extends ProcessWindowFunction <OrderLog, OrderLog, String, TimeWindow> {
private static final long serialVersionUID = -6632888020403733197L;
@Override
public void process(String arg0, ProcessWindowFunction<OrderLog, OrderLog, String, TimeWindow>.Context ctx,
Iterable<OrderLog> it, Collector<OrderLog> collect) throws Exception {
Iterator<OrderLog> iterator = it.iterator();
while (iterator.hasNext()) {
OrderLog orderLog = iterator.next();
collect.collect(orderLog);
}
}
}
/**
* sink函数
*/
class SideOutPutSinkFunction extends RichSinkFunction<OrderLog> {
private static final long serialVersionUID = -6632888020403733197L;
@Override
public void invoke(OrderLog orderLog, Context context) throws Exception {
//做自己的存储计算逻辑
System.out.println(JSON.toJSONString(orderLog));
}
}
其中Trigger 代码如下:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class OrderCountSingleTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc;
private OrderCountSingleTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor("count", new OrderCountSingleTrigger.Sum(), LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
count.add(1L);
if ((Long)count.get() >= this.maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
} else {
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(W window, TriggerContext ctx) throws Exception {
((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
}
public boolean canMerge() {
return true;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(this.stateDesc);
}
public String toString() {
return "CountTrigger(" + this.maxCount + ")";
}
public static <W extends Window> OrderCountSingleTrigger<W> of(long maxCount) {
return new OrderCountSingleTrigger(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献8条内容
所有评论(0)