SparkStreaming整合Kafka
需求:使用SparkStreaming实时消费Kafka中的数据。
首先需要引入spark-streaming-kafka的依赖。
<dependencies>
...
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
</dependencies>
scala代码如下:
package com.simoniu.scalademo
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark 消费 Kafka中的数据
* Created by simoniu
*/
object StreamKafkaScalaDemo {
def main(args: Array[String]): Unit = {
//创建StreamingContext,指定读取数据的时间间隔为5秒
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamKafkaScala")
val ssc = new StreamingContext(conf, Seconds(5))
//指定Kafka的配置信息
val kafkaParams = Map[String, Object](
//kafka的broker地址信息
"bootstrap.servers" -> "master:9092,slave1:9092,slave2:9092",
//key的序列化类型
"key.deserializer" -> classOf[StringDeserializer],
//value的序列化类型
"value.deserializer" -> classOf[StringDeserializer],
//消费者组id
"group.id" -> "con_2",
//消费策略
"auto.offset.reset" -> "latest",
//自动提交offset
"enable.auto.commit" -> (true: java.lang.Boolean)
)
//指定要读取的topic名称
val topics = Array("t1")
//获取消费Kafka的数据流
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//处理数据
kafkaDStream.map(record => (record.key(), record.value()))
//将数据打印到控制台
.print()
//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}
使用kafka的console生产者模拟产生数据。
[root@master kafka]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic t1
>welcome hadoop
在idea中运行代码,控制台输出结果:
-------------------------------------------
Time: 1703146825000 ms
-------------------------------------------
(null,welcome hadoop)
java代码实现:
package com.simoniu.sparkdemo.javademo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Spark 消费 Kafka中的数据
* Created by simoniu
*/
public class StreamKafkaJavaDemo {
public static void main(String[] args) throws Exception{
//创建StreamingContext,指定读取数据的时间间隔为5秒
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("StreamWordCountJava");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
//指定Kafka的配置信息
HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", StringDeserializer.class.getName());
kafkaParams.put("group.id","con_2");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",true);
//指定要读取的topic名称
ArrayList<String> topics = new ArrayList<String>();
topics.add("t1");
//获取消费Kafka的数据流
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//处理数据
kafkaStream.map(new Function<ConsumerRecord<String, String>, Tuple2<String,String>>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<String, String>(record.key(),record.value());
}
}).print();//将数据打印到控制台
//启动任务
ssc.start();
//等待任务停止
ssc.awaitTermination();
}
}