← 返回首页
KafkaConsumer的使用
发表时间:2023-12-20 07:36:23
KafkaConsumer的使用

KafkaConsumer的使用

Flink提供了很多Connector组件,其中应用较广泛的就是Kafka这个Connector了,本节我们针对Kafka-Connector在Flink中的应用做详细的分析。

1.Kafka-Connector

针对Flink的流处理,最常用的组件就是Kafka,原始日志数据产生后会被日志采集工具采集到Kafka中让Flink去处理,处理之后的数据可能也会继续写入到Kafka中,Kafka可以作为Flink的DataSource和DataSink来使用。并且Kafka中的Partition机制和Flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。

Flink中使用Kafka首先需要添加对应的依赖。

<properties>
  ...
  <flink.version>1.11.1</flink.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
  </dependency>
</dependencies>

1).scala代码实现

package com.simoniu.flink.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * Flink从kafka中消费数据
 * Created by simoniu
 */
object StreamKafkaSourceScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //指定FlinkKafkaConsumer相关配置
    val topic = "t1"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
    prop.setProperty("group.id", "con1")
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

    //指定Kafka作为Source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //将读取到的数据打印到控制台上
    text.print()

    env.execute("StreamKafkaSourceScala")
  }
}

在运行代码之前,需要先启动zookeeper集群和kafka集群,在kafka中创建topic:t1

[root@master kafka]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-topics.sh --create --zookeeper master:2181 --partitions 5 --replication-factor 2 --topic t1

然后启动代码,再启动一个Kafka 的 console生产者模拟产生数据,验证效果。

[root@master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic t1
>hello flink

程序控制台输出结果:

6> hello flink

2).java代码实现

package com.simoniu.flink.java.kafkaconnector;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * Flink从Kafka中消费数据
 * Created by simoniu
 */
public class StreamKafkaSourceJavaDemo {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //指定FlinkKafkaConsumer相关配置
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
        prop.setProperty("group.id","con1");
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        //指定Kafka作为Source
        DataStreamSource<String> text = env.addSource(kafkaConsumer);

        //将读取到的数据打印到控制台上
        text.print();

        env.execute("StreamKafkaSourceJava");
    }
}