← 返回首页
Transformation-outerJoin
发表时间:2023-11-25 16:16:26
DataSetAPI之Transformation-outjoin

DataSetAPI之Transformation-outjoin

1.DataSetAPI之Transformation-outjoin

1).scala实现

package com.simoniu.flink.scala.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment

/**
 * outerJoin:外连接
 * 一共有三种情况
 * 1:leftOuterJoin
 * 2:rightOuterJoin
 * 3:fullOuterJoin
 * Created by simoniu
 */
object BatchOuterJoinScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "刘强东"), (2, "马斯克"), (3, "马化腾")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "北京"), (2, "上海"), (4, "广州")))

    //对两份数据集执行leftOuterJoin操作
    text1.leftOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:second中的元素可能为null
          if (second == null) {
            (first._1, first._2, "null")
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()

    println("========================================")

    //对两份数据集执行rightOuterJoin操作
    text1.rightOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:first中的元素可能为null
          if (first == null) {
            (second._1, "null", second._2)
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()


    println("========================================")

    //对两份数据集执行rightOuterJoin操作
    text1.fullOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:first和second中的元素都有可能为null
          if (first == null) {
            (second._1, "null", second._2)
          } else if (second == null) {
            (first._1, first._2, "null")
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()
  }
}

运行结果:

(3,马化腾,null)
(1,刘强东,北京)
(2,马斯克,上海)
========================================
(1,刘强东,北京)
(4,null,广州)
(2,马斯克,上海)
========================================
(3,马化腾,null)
(1,刘强东,北京)
(4,null,广州)
(2,马斯克,上海)

2).java实现

package com.simoniu.flink.java.batch.transformation;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
 * outerJoin:外连接
 * 一共有三种情况
 * 1:leftOuterJoin
 * 2:rightOuterJoin
 * 3:fullOuterJoin
 * Created by simoniu
 */
public class BatchOuterJoinJavaDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //初始化第一份数据 Tuple2<用户id,用户姓名>
        ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
        data1.add(new Tuple2<Integer, String>(1, "刘强东"));
        data1.add(new Tuple2<Integer, String>(2, "马斯克"));
        data1.add(new Tuple2<Integer, String>(3, "马化腾"));
        DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
        //初始化第二份数据 Tuple2<用户id,用户所在城市>
        ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
        data2.add(new Tuple2<Integer, String>(1, "北京"));
        data2.add(new Tuple2<Integer, String>(2, "上海"));
        data2.add(new Tuple2<Integer, String>(4, "广州"));
        DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);

        //对两份数据集执行leftOuterJoin操作
        text1.leftOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
                            throws Exception {
                        if (second == null) {
                            return new Tuple3<Integer, String, String>(first.f0, first.f1, "null");
                        } else {
                            return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
                        }
                    }
                }).print();
        System.out.println("==============================================");
        //对两份数据集执行rightOuterJoin操作
        text1.rightOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
                            throws Exception {
                        if (first == null) {
                            return new Tuple3<Integer, String, String>(second.f0, "null", second.f1);
                        } else {
                            return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
                        }
                    }
                }).print();
        System.out.println("==============================================");
        //对两份数据集执行rightOuterJoin操作
        text1.fullOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
                            throws Exception {
                        if (first == null) {
                            return new Tuple3<Integer, String, String>(second.f0, "null", second.f1);
                        } else if (second == null) {
                            return new Tuple3<Integer, String, String>(first.f0, first.f1, "null");
                        } else {
                            return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
                        }
                    }
                }).print();
    }
}