一、SideOutput流作用

Flink侧输出流有两个作用:

(1)分隔过滤。充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费

(2)延时数据处理。在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失

二、SideOutput使用示例

orderLog.tx如下:

{"orderId":"20201011231245423","skuId":"1226354","priceType":"new","requestTime":"1599931359071"}
{"orderId":"20201011231254678","skuId":"1226322","priceType":"normal","requestTime":"1599931359024"}
{"orderId":"20201011231212768","skuId":"1226324","priceType":"back","requestTime":"1599931359011"}
{"orderId":"20201011231234567","skuId":"1226351","priceType":"normal","requestTime":"1599931359073"}

 订单流实体OrderLog如下 :

@Data
class OrderLog {
	private String orderId;
	
	private String skuId;
	
	private String priceType;
	
	private Long requestTime;
}

(1)分隔过滤作用

将不同价格类型订单从主流中分开处理

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.alibaba.fastjson.JSON;
import lombok.Data;

public class TestSideOutputStream {
	 public static final OutputTag<OrderLog> NEW_PRICE = new OutputTag<>("NEW_PRICE", TypeInformation.of(OrderLog.class));
	 public static final OutputTag<OrderLog> BACK_FLOW_PRICE = new OutputTag<>("BACK_FLOW_PRICE", TypeInformation.of(OrderLog.class));
     
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textDataSteam = env.readTextFile("C:\\Users\\admin\\Desktop\\orderLog.txt");
        //不同价格类型的流分开处理,将新人价、回流价类型的输出到侧输出流
        SingleOutputStreamOperator<OrderLog> streamOperator = textDataSteam.process(new SideOutPutProcessFunction());
        
        streamOperator.print("主流-正常价");
        streamOperator.getSideOutput(NEW_PRICE).print("侧硫-新人价");
        streamOperator.getSideOutput(BACK_FLOW_PRICE).print("侧硫-回流价");
        
        env.execute();
    }
}

class SideOutPutProcessFunction extends ProcessFunction<String, OrderLog> {
	private static final long serialVersionUID = -6632888020403733197L;

	@Override
	public void processElement(String value, ProcessFunction<String, OrderLog>.Context ctx,
			Collector<OrderLog> collect) throws Exception {
		OrderLog orderLog = JSON.parseObject(value, OrderLog.class);
		if("normal".equals(orderLog.getPriceType())) {
			collect.collect(orderLog);
		} else if("new".equals(orderLog.getPriceType())) {
			ctx.output(TestSideOutputStream.NEW_PRICE, orderLog);
		} if("back".equals(orderLog.getPriceType())) {
			ctx.output(TestSideOutputStream.BACK_FLOW_PRICE, orderLog);
		} 
	}
	
}

输出结果

主流-正常价:3> OrderLog(orderId=20201011231234567, skuId=1226351, priceType=normal)
侧硫-回流价:2> OrderLog(orderId=20201011231212768, skuId=1226324, priceType=back)
侧硫-新人价:1> OrderLog(orderId=20201011231245423, skuId=1226354, priceType=new)
主流-正常价:1> OrderLog(orderId=20201011231254678, skuId=1226322, priceType=normal)

 (2)窗口延时数据处理

sideOutputLateData输出迟到数据,getSideOutput得到迟到数据继续处理

import java.util.Iterator;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.alibaba.fastjson.JSON;
import com.jd.ubb.feed.flow.data.compute.trigger.OrderCountSingleTrigger;

import lombok.Data;

public class TestSideOutputStream {
	 public static final OutputTag<OrderLog> NEW_PRICE = new OutputTag<>("NEW_PRICE", TypeInformation.of(OrderLog.class));
	 public static final OutputTag<OrderLog> BACK_FLOW_PRICE = new OutputTag<>("BACK_FLOW_PRICE", TypeInformation.of(OrderLog.class));
     
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textDataSteam = env.readTextFile("C:\\Users\\admin\\Desktop\\orderLog.txt");
        
        SingleOutputStreamOperator<OrderLog> dayPvDataStream = textDataSteam
        		.flatMap(new SideOutPutMapFunction())
        		.assignTimestampsAndWatermarks(new AssignedWaterMarks(Time.seconds(3))) //保证乱序事件处理
        		.keyBy(OrderLog::getOrderId)
        		.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))  //按1天开窗口
        		.trigger(OrderCountSingleTrigger.of(1))    //来一条数据做一次计算
        		.allowedLateness(Time.minutes(30))      //允许数据迟到30分钟
        		.sideOutputLateData(TestSideOutputStream.NEW_PRICE) 
        		.process(new SideOutPutWindowProcessFunction());
        
        dayPvDataStream.addSink(new SideOutPutSinkFunction());
        
        dayPvDataStream.getSideOutput(TestSideOutputStream.NEW_PRICE) //得到迟到数据处理
        .keyBy(OrderLog::getOrderId)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))  //对迟到数据3秒计算一次
        .process(new SideOutPutWindowProcessFunction())
        .addSink(new SideOutPutSinkFunction());
        
        env.execute();
    }
}

/**
 * 水位线,保证乱序事件时间处理
 */
class AssignedWaterMarks extends BoundedOutOfOrdernessTimestampExtractor<OrderLog> {
	private static final long serialVersionUID = 2021421640499388219L;
	
	public AssignedWaterMarks(Time maxOutOfOrderness) {
		super(maxOutOfOrderness);
	}

    @Override
    public long extractTimestamp(OrderLog orderLog) {
        return orderLog.getRequestTime();
    }
}

/**
 * map转换输出
 */
class SideOutPutMapFunction extends RichFlatMapFunction<String, OrderLog> {
	private static final long serialVersionUID = -6478853684295335571L;

	@Override
	public void flatMap(String value, Collector<OrderLog> out) throws Exception {
		OrderLog orderLog = JSON.parseObject(value, OrderLog.class);
		out.collect(orderLog);
	}
	
}

/**
 * 窗口函数
 */
class SideOutPutWindowProcessFunction extends ProcessWindowFunction <OrderLog, OrderLog, String, TimeWindow>  {
	private static final long serialVersionUID = -6632888020403733197L;

	@Override
	public void process(String arg0, ProcessWindowFunction<OrderLog, OrderLog, String, TimeWindow>.Context ctx,
			Iterable<OrderLog> it, Collector<OrderLog> collect) throws Exception {
		Iterator<OrderLog> iterator = it.iterator();
		while (iterator.hasNext()) {
			OrderLog orderLog = iterator.next();
			collect.collect(orderLog);
		}
	}
}

/**
 * sink函数
 */
class SideOutPutSinkFunction extends RichSinkFunction<OrderLog>  {
	private static final long serialVersionUID = -6632888020403733197L;
	
	@Override
	public void invoke(OrderLog orderLog, Context context) throws Exception {
		//做自己的存储计算逻辑
		System.out.println(JSON.toJSONString(orderLog));
	}
	
}

其中Trigger 代码如下:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class OrderCountSingleTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc;

    private OrderCountSingleTrigger(long maxCount) {
        this.stateDesc = new ReducingStateDescriptor("count", new OrderCountSingleTrigger.Sum(), LongSerializer.INSTANCE);
        this.maxCount = maxCount;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        count.add(1L);
        if ((Long)count.get() >= this.maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, TriggerContext ctx) throws Exception {
        ((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
    }

    public String toString() {
        return "CountTrigger(" + this.maxCount + ")";
    }

    public static <W extends Window> OrderCountSingleTrigger<W> of(long maxCount) {
        return new OrderCountSingleTrigger(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Sum() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }

}

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐