← 返回首页
DataStreamAPI之DataSink
发表时间:2023-11-23 15:44:45
DataStreamAPI之DataSink

DataStreamAPI之DataSink。

1.DataSink

DataSink是输出组件,负责把计算好的数据输出到其它存储介质中。Flink支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。还有就是print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息。

Flink提供了一批Connectors,可以实现输出到第三方目的地,如下表所示:

在实际工作中最常用的是kafka、redis。针对Flink提供的常用sink组件,可以提供这些容错性保证,如下表所示:

注意:redis sink是在Bahir这个依赖包中,所以在pom.xml中需要添加对应的依赖。

redis sink组件案例:

需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。

1).scala实现

pom.xml中添加Bahir依赖。

<properties>
   ...
   <bahir.version>1.0</bahir.version>
</properties>

<dependencies>
  ...
  <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>${bahir.version}</version>
  </dependency>
</dependencies>
package com.simoniu.flink.scala.stream.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
 * 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
 * Created by simoniu
 */
object StreamRedisSinkScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //连接socket获取输入数据
    val text = env.socketTextStream("master", 9001)

    import org.apache.flink.api.scala._
    //组装数据,这里组装的是tuple2类型
    //第一个元素是指list队列的key名称
    //第二个元素是指需要向list队列中添加的元素
    val listData = text.map(word => ("l_words_scala", word))

    //指定redisSink
    val conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("helloworld").build()
    val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper)
    listData.addSink(redisSink)

    env.execute("StreamRedisSinkScala")
  }

  class MyRedisMapper extends RedisMapper[Tuple2[String, String]] {
    //指定具体的操作命令
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.LPUSH)
    }

    //获取key
    override def getKeyFromData(data: (String, String)): String = {
      data._1
    }

    //获取value
    override def getValueFromData(data: (String, String)): String = {
      data._2
    }
  }
}

注意:执行代码之前,需要先开启socket和redis服务。

[root@master ~]# nc -l 9001
hello linux
hello flink

redis中启动monitor和lrange命令查看输出结果

127.0.0.1:6379> monitor
OK
1700841324.885770 [0 192.168.128.1:8807] "LPUSH" "l_words_scala" "hello linux"
1700841334.546623 [0 192.168.128.1:8811] "LPUSH" "l_words_scala" "hello flink"
^Z


127.0.0.1:6379> lrange l_words_scala 0 -1
1) "hello flink"
2) "hello linux"

2).java实现

package com.simoniu.flink.java.stream.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
 * Created by simoniu
 */
public class StreamRedisSinkJavaDemo {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接socket获取输入数据
        DataStreamSource<String> text = env.socketTextStream("master", 9001);
        //组装数据
        SingleOutputStreamOperator<Tuple2<String, String>> listData = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String word) throws Exception {
                return new Tuple2<String, String>("l_words_java", word);
            }
        });

        //指定redisSink
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("helloworld").build();
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        listData.addSink(redisSink);

        env.execute("StreamRedisSinkJava");
    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
        //指定具体的操作命令
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }

        //获取key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //获取value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }
    }
}

先开启socket和redis服务.

[root@master ~]# nc -l 9001
hello spark
hello hive

redis中启动monitor和lrange命令查看输出结果

127.0.0.1:6379> monitor
OK
1700841864.786425 [0 192.168.128.1:9110] "LPUSH" "l_words_java" "hello spark"
1700841864.796515 [0 192.168.128.1:9111] "LPUSH" "l_words_java" "hello hive"

^Z


127.0.0.1:6379> lrange l_words_java 0 -1
1) "hello hive"
2) "hello spark"