DataSetAPI之Transformation-outjoin
1).scala实现
package com.simoniu.flink.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* outerJoin:外连接
* 一共有三种情况
* 1:leftOuterJoin
* 2:rightOuterJoin
* 3:fullOuterJoin
* Created by simoniu
*/
object BatchOuterJoinScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据 Tuple2<用户id,用户姓名>
val text1 = env.fromCollection(Array((1, "刘强东"), (2, "马斯克"), (3, "马化腾")))
//初始化第二份数据 Tuple2<用户id,用户所在城市>
val text2 = env.fromCollection(Array((1, "北京"), (2, "上海"), (4, "广州")))
//对两份数据集执行leftOuterJoin操作
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0) {
(first, second) => {
//注意:second中的元素可能为null
if (second == null) {
(first._1, first._2, "null")
} else {
(first._1, first._2, second._2)
}
}
}.print()
println("========================================")
//对两份数据集执行rightOuterJoin操作
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0) {
(first, second) => {
//注意:first中的元素可能为null
if (first == null) {
(second._1, "null", second._2)
} else {
(first._1, first._2, second._2)
}
}
}.print()
println("========================================")
//对两份数据集执行rightOuterJoin操作
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0) {
(first, second) => {
//注意:first和second中的元素都有可能为null
if (first == null) {
(second._1, "null", second._2)
} else if (second == null) {
(first._1, first._2, "null")
} else {
(first._1, first._2, second._2)
}
}
}.print()
}
}
运行结果:
(3,马化腾,null)
(1,刘强东,北京)
(2,马斯克,上海)
========================================
(1,刘强东,北京)
(4,null,广州)
(2,马斯克,上海)
========================================
(3,马化腾,null)
(1,刘强东,北京)
(4,null,广州)
(2,马斯克,上海)
2).java实现
package com.simoniu.flink.java.batch.transformation;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
/**
* outerJoin:外连接
* 一共有三种情况
* 1:leftOuterJoin
* 2:rightOuterJoin
* 3:fullOuterJoin
* Created by simoniu
*/
public class BatchOuterJoinJavaDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//初始化第一份数据 Tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<Integer, String>(1, "刘强东"));
data1.add(new Tuple2<Integer, String>(2, "马斯克"));
data1.add(new Tuple2<Integer, String>(3, "马化腾"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
//初始化第二份数据 Tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<Integer, String>(1, "北京"));
data2.add(new Tuple2<Integer, String>(2, "上海"));
data2.add(new Tuple2<Integer, String>(4, "广州"));
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
//对两份数据集执行leftOuterJoin操作
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (second == null) {
return new Tuple3<Integer, String, String>(first.f0, first.f1, "null");
} else {
return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
}
}
}).print();
System.out.println("==============================================");
//对两份数据集执行rightOuterJoin操作
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (first == null) {
return new Tuple3<Integer, String, String>(second.f0, "null", second.f1);
} else {
return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
}
}
}).print();
System.out.println("==============================================");
//对两份数据集执行rightOuterJoin操作
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (first == null) {
return new Tuple3<Integer, String, String>(second.f0, "null", second.f1);
} else if (second == null) {
return new Tuple3<Integer, String, String>(first.f0, first.f1, "null");
} else {
return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
}
}
}).print();
}
}