Flink消费Kafka
Flink是由Apache开源的分布式流处理框架,常用来处理实时性的任务,与Spark Streaming和Strom功能上类似,具体选择需要根据业务场景来进行选择。本文将通过Flink的方式来消费Kafka数据,基于以下版本开发测试。scala-version:2.11.8flink-version:1.8.0kafka-version:1.1...
·
Flink是由Apache开源的分布式流处理框架,常用来处理实时性的任务,与Spark Streaming和Strom功能上类似,具体选择需要根据业务场景来进行选择。
本文将通过Flink的方式来消费Kafka数据,基于以下版本开发测试。
scala-version:2.11.8
flink-version:1.8.0
kafka-version:1.1.0
接下来正式开始了。
1. 添加所需的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>
2. Flink消费kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
object ReadingFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val kafkaProps = new Properties()
kafkaProps.setProperty("zookeeper.connect", "localhost:2181")
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "test")
val transaction = env.addSource(new FlinkKafkaConsumer011[String]("test", new SimpleStringSchema(), kafkaProps))
transaction.print()
env.execute()
}
}
3. 运行代码程序然后用命令启动Kafka生产者发送数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>flink spark strom
4. 打印结果如下
flink spark strom
更多推荐
已为社区贡献1条内容
所有评论(0)