KafkaConsumer的使用
Flink提供了很多Connector组件,其中应用较广泛的就是Kafka这个Connector了,本节我们针对Kafka-Connector在Flink中的应用做详细的分析。
针对Flink的流处理,最常用的组件就是Kafka,原始日志数据产生后会被日志采集工具采集到Kafka中让Flink去处理,处理之后的数据可能也会继续写入到Kafka中,Kafka可以作为Flink的DataSource和DataSink来使用。并且Kafka中的Partition机制和Flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。
Flink中使用Kafka首先需要添加对应的依赖。
<properties>
...
<flink.version>1.11.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
1).scala代码实现
package com.simoniu.flink.scala.kafkaconnector
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* Flink从kafka中消费数据
* Created by simoniu
*/
object StreamKafkaSourceScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定FlinkKafkaConsumer相关配置
val topic = "t1"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
prop.setProperty("group.id", "con1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//指定Kafka作为Source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)
//将读取到的数据打印到控制台上
text.print()
env.execute("StreamKafkaSourceScala")
}
}
在运行代码之前,需要先启动zookeeper集群和kafka集群,在kafka中创建topic:t1
[root@master kafka]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-topics.sh --create --zookeeper master:2181 --partitions 5 --replication-factor 2 --topic t1
然后启动代码,再启动一个Kafka 的 console生产者模拟产生数据,验证效果。
[root@master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic t1
>hello flink
程序控制台输出结果:
6> hello flink
2).java代码实现
package com.simoniu.flink.java.kafkaconnector;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* Flink从Kafka中消费数据
* Created by simoniu
*/
public class StreamKafkaSourceJavaDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定FlinkKafkaConsumer相关配置
String topic = "t1";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
prop.setProperty("group.id","con1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
//指定Kafka作为Source
DataStreamSource<String> text = env.addSource(kafkaConsumer);
//将读取到的数据打印到控制台上
text.print();
env.execute("StreamKafkaSourceJava");
}
}