← 返回首页
SparkStreaming整合Kafka
发表时间:2023-12-21 04:12:11
SparkStreaming整合Kafka

SparkStreaming整合Kafka

1.Spark Streaming整合Kafka

需求:使用SparkStreaming实时消费Kafka中的数据。

首先需要引入spark-streaming-kafka的依赖。

<dependencies> 
   ...
  <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
     <version>${spark-version}</version>
  </dependency>
</dependencies> 

scala代码如下:

package com.simoniu.scalademo
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Spark 消费 Kafka中的数据
 * Created by simoniu
 */
object StreamKafkaScalaDemo {
  def main(args: Array[String]): Unit = {
    //创建StreamingContext,指定读取数据的时间间隔为5秒
    val conf = new SparkConf().setMaster("local[2]").setAppName("StreamKafkaScala")
    val ssc = new StreamingContext(conf, Seconds(5))

    //指定Kafka的配置信息
    val kafkaParams = Map[String, Object](
      //kafka的broker地址信息
      "bootstrap.servers" -> "master:9092,slave1:9092,slave2:9092",
      //key的序列化类型
      "key.deserializer" -> classOf[StringDeserializer],
      //value的序列化类型
      "value.deserializer" -> classOf[StringDeserializer],
      //消费者组id
      "group.id" -> "con_2",
      //消费策略
      "auto.offset.reset" -> "latest",
      //自动提交offset
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    //指定要读取的topic名称
    val topics = Array("t1")

    //获取消费Kafka的数据流
    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //处理数据
    kafkaDStream.map(record => (record.key(), record.value()))
      //将数据打印到控制台
      .print()
    //启动任务
    ssc.start()
    //等待任务停止
    ssc.awaitTermination()
  }
}

使用kafka的console生产者模拟产生数据。

[root@master kafka]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic t1
>welcome hadoop

在idea中运行代码,控制台输出结果:

-------------------------------------------
Time: 1703146825000 ms
-------------------------------------------
(null,welcome hadoop)

java代码实现:

package com.simoniu.sparkdemo.javademo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.HashMap;

/**
 * Spark 消费 Kafka中的数据
 * Created by simoniu
 */
public class StreamKafkaJavaDemo {

    public static void main(String[] args) throws Exception{
        //创建StreamingContext,指定读取数据的时间间隔为5秒
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("StreamWordCountJava");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));

        //指定Kafka的配置信息
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
        kafkaParams.put("value.deserializer", StringDeserializer.class.getName());
        kafkaParams.put("group.id","con_2");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("enable.auto.commit",true);

        //指定要读取的topic名称
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("t1");

        //获取消费Kafka的数据流
        JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

        //处理数据
        kafkaStream.map(new Function<ConsumerRecord<String, String>, Tuple2<String,String>>() {
            public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                return new Tuple2<String, String>(record.key(),record.value());
            }
        }).print();//将数据打印到控制台

        //启动任务
        ssc.start();
        //等待任务停止
        ssc.awaitTermination();
    }
}