← 返回首页
Transformation-join
发表时间:2023-11-25 16:04:24
DataSetAPI之Transformation-join

DataSetAPI之Transformation-join

下面还有一些transformation算子。

1.DataSetAPI之Transformation-join

join用法案例:

1).scala实现

package com.simoniu.flink.scala.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment

/**
 * join:内连接
 * Created by simoniu
 */
object BatchJoinScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据  Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "刘强东"), (2, "马斯克"), (3, "马化腾")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "北京"), (2, "上海"), (4, "广州")))

    //对两份数据集执行join操作
    text1.join(text2)
      //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
      //where:指定左边数据集中参与比较的元素角标
      .where(0)
      //equalTo指定右边数据集中参与比较的元素角标
      .equalTo(0){(first,second)=>{
        (first._1,first._2,second._2)
      }}.print()
  }
}

运行结果:

(1,刘强东,北京)
(2,马斯克,上海)

2).java实现

package com.simoniu.flink.java.batch.transformation;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;
import java.util.Arrays;

/**
 * join:内连接
 * Created by simoniu
 */
public class BatchJoinJavaDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //初始化第一份数据 Tuple2<用户id,用户姓名>
        ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
        data1.add(new Tuple2<Integer, String>(1, "刘强东"));
        data1.add(new Tuple2<Integer, String>(2, "马斯克"));
        data1.add(new Tuple2<Integer, String>(3, "马化腾"));
        DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
        //初始化第二份数据 Tuple2<用户id,用户所在城市>
        ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
        data2.add(new Tuple2<Integer, String>(1, "北京"));
        data2.add(new Tuple2<Integer, String>(2, "上海"));
        data2.add(new Tuple2<Integer, String>(4, "广州"));
        DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);

        //对两份数据集执行join操作
        text1.join(text2)
                .where(0)
                .equalTo(0)
                //三个输入参数:
                //第一个tuple2是左边数据集的类型,
                //第二个tuple2是右边数据集的类型,
                //第三个tuple3是此函数返回的数据集类型
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
                            throws Exception {
                        return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
                    }
                }).print();

    }
}