DataStreamAPI之DataSink。
DataSink是输出组件,负责把计算好的数据输出到其它存储介质中。Flink支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。还有就是print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息。
Flink提供了一批Connectors,可以实现输出到第三方目的地,如下表所示:

在实际工作中最常用的是kafka、redis。针对Flink提供的常用sink组件,可以提供这些容错性保证,如下表所示:

注意:redis sink是在Bahir这个依赖包中,所以在pom.xml中需要添加对应的依赖。
redis sink组件案例:
需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
1).scala实现
pom.xml中添加Bahir依赖。
<properties>
...
<bahir.version>1.0</bahir.version>
</properties>
<dependencies>
...
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>${bahir.version}</version>
</dependency>
</dependencies>
package com.simoniu.flink.scala.stream.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
* Created by simoniu
*/
object StreamRedisSinkScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("master", 9001)
import org.apache.flink.api.scala._
//组装数据,这里组装的是tuple2类型
//第一个元素是指list队列的key名称
//第二个元素是指需要向list队列中添加的元素
val listData = text.map(word => ("l_words_scala", word))
//指定redisSink
val conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("helloworld").build()
val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper)
listData.addSink(redisSink)
env.execute("StreamRedisSinkScala")
}
class MyRedisMapper extends RedisMapper[Tuple2[String, String]] {
//指定具体的操作命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
//获取key
override def getKeyFromData(data: (String, String)): String = {
data._1
}
//获取value
override def getValueFromData(data: (String, String)): String = {
data._2
}
}
}
注意:执行代码之前,需要先开启socket和redis服务。
[root@master ~]# nc -l 9001
hello linux
hello flink
redis中启动monitor和lrange命令查看输出结果
127.0.0.1:6379> monitor
OK
1700841324.885770 [0 192.168.128.1:8807] "LPUSH" "l_words_scala" "hello linux"
1700841334.546623 [0 192.168.128.1:8811] "LPUSH" "l_words_scala" "hello flink"
^Z
127.0.0.1:6379> lrange l_words_scala 0 -1
1) "hello flink"
2) "hello linux"
2).java实现
package com.simoniu.flink.java.stream.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
* Created by simoniu
*/
public class StreamRedisSinkJavaDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = env.socketTextStream("master", 9001);
//组装数据
SingleOutputStreamOperator<Tuple2<String, String>> listData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String word) throws Exception {
return new Tuple2<String, String>("l_words_java", word);
}
});
//指定redisSink
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("helloworld").build();
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
listData.addSink(redisSink);
env.execute("StreamRedisSinkJava");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
//指定具体的操作命令
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
//获取key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//获取value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
}
先开启socket和redis服务.
[root@master ~]# nc -l 9001
hello spark
hello hive
redis中启动monitor和lrange命令查看输出结果
127.0.0.1:6379> monitor
OK
1700841864.786425 [0 192.168.128.1:9110] "LPUSH" "l_words_java" "hello spark"
1700841864.796515 [0 192.168.128.1:9111] "LPUSH" "l_words_java" "hello hive"
^Z
127.0.0.1:6379> lrange l_words_java 0 -1
1) "hello hive"
2) "hello spark"