DataSetAPI之Transformation-join
下面还有一些transformation算子。

join用法案例:
1).scala实现
package com.simoniu.flink.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* join:内连接
* Created by simoniu
*/
object BatchJoinScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据 Tuple2<用户id,用户姓名>
val text1 = env.fromCollection(Array((1, "刘强东"), (2, "马斯克"), (3, "马化腾")))
//初始化第二份数据 Tuple2<用户id,用户所在城市>
val text2 = env.fromCollection(Array((1, "北京"), (2, "上海"), (4, "广州")))
//对两份数据集执行join操作
text1.join(text2)
//注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
//where:指定左边数据集中参与比较的元素角标
.where(0)
//equalTo指定右边数据集中参与比较的元素角标
.equalTo(0){(first,second)=>{
(first._1,first._2,second._2)
}}.print()
}
}
运行结果:
(1,刘强东,北京)
(2,马斯克,上海)
2).java实现
package com.simoniu.flink.java.batch.transformation;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
import java.util.Arrays;
/**
* join:内连接
* Created by simoniu
*/
public class BatchJoinJavaDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//初始化第一份数据 Tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<Integer, String>(1, "刘强东"));
data1.add(new Tuple2<Integer, String>(2, "马斯克"));
data1.add(new Tuple2<Integer, String>(3, "马化腾"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
//初始化第二份数据 Tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<Integer, String>(1, "北京"));
data2.add(new Tuple2<Integer, String>(2, "上海"));
data2.add(new Tuple2<Integer, String>(4, "广州"));
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
//对两份数据集执行join操作
text1.join(text2)
.where(0)
.equalTo(0)
//三个输入参数:
//第一个tuple2是左边数据集的类型,
//第二个tuple2是右边数据集的类型,
//第三个tuple3是此函数返回的数据集类型
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
}
}).print();
}
}