Flink是由Apache开源的分布式流处理框架,常用来处理实时性的任务,与Spark Streaming和Strom功能上类似,具体选择需要根据业务场景来进行选择。

本文将通过Flink的方式来消费Kafka数据,基于以下版本开发测试。

      scala-version:2.11.8

      flink-version:1.8.0

      kafka-version:1.1.0

接下来正式开始了。

1.  添加所需的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

2. Flink消费kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._

object ReadingFromKafka {

  def main(args: Array[String]): Unit = {

      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.enableCheckpointing(1000)
      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


      val kafkaProps = new Properties()
      kafkaProps.setProperty("zookeeper.connect", "localhost:2181")
      kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
      kafkaProps.setProperty("group.id", "test")

      val transaction = env.addSource(new FlinkKafkaConsumer011[String]("test", new SimpleStringSchema(), kafkaProps))

      transaction.print()
      env.execute()

  }
}

3. 运行代码程序然后用命令启动Kafka生产者发送数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>flink spark strom

4.  打印结果如下

flink spark strom

 

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐