Flink的StreamingFileSink自定义DateTimeBucket
用flink消费kafka内容,通过清洗、转换、过滤后,要sink到parquet文件,需要按照事件的event进行分区生产需要写入的文件夹,如event1的发生时间在2018-03-19,而event2的发生时间在2018-03-20,这就涉及到extract它的eventtime,并生产parquet文件的bucktId,具体代码如下:/** Licensed to the A...
·
用flink消费kafka内容,通过清洗、转换、过滤后,要sink到parquet文件,需要按照事件的event进行分区生产需要写入的文件夹,如event1的发生时间在2018-03-19,而event2的发生时间在2018-03-20,这就涉及到extract它的eventtime,并生产parquet文件的bucktId,具体代码如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hellobike.realtimeplatform.utils;
import com.hellobike.realtimeplatform.model.AccessLog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
/**
* A {@link BucketAssigner} that assigns to buckets based on current system time.
*
*
* <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
* The {@code dateTimePath} is determined based on the current system time and the
* user provided format string.
*
*
* <p>{@link DateTimeFormatter} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
* <p>Example:
*
* <pre>{@code
* BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");
* }</pre>
*
* <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*/
@PublicEvolving
public class DateTimeBucketWithPartitionAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
*/
public DateTimeBucketWithPartitionAssigner() {
this(DEFAULT_FORMAT_STRING);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
*
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket id.
*/
public DateTimeBucketWithPartitionAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
*
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public DateTimeBucketWithPartitionAssigner(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public DateTimeBucketWithPartitionAssigner(String formatString, ZoneId zoneId) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
}
@Override
public String getBucketId(IN element, Context context) {
long eventTime = 0L;
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
if (element instanceof AccessLog) {
if (-1 == ((AccessLog) element).getParseResult() && "".equals(((AccessLog) element).getEventTimestamp())) {
return "pt=errorTime";
}
eventTime = Long.valueOf(((AccessLog) element).getEventTimestamp());
return "pt=" + dateTimeFormatter.format(Instant.ofEpochMilli(eventTime));
} else {
return "pt=errorObjects";
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String toString() {
return "DateTimeBucketAssigner{" +
"formatString='" + formatString + '\'' +
", zoneId=" + zoneId +
'}';
}
}
指定自定义的DateTime Assigner就可以实现基于event time分桶写入parquet文件
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献1条内容
所有评论(0)