← 返回首页
DataStream API之Transformation
发表时间:2023-11-22 15:02:59
DataStream API之Transformation

DataStream API之Transformation。

1.Transformation

transformation是Flink程序的计算算子,负责对数据进行处理,Flink提供了大量的算子,其实Flink中的大部分算子的使用和spark中算子的使用是一样的,下面常用算子:

这些算子用法其实和spark中对应算子的用法基本是一致的,这里面的map、flatmap、keyBy、reduce、sum这些算子我们都用过了。union:表示合并多个流,但是多个流的数据类型必须一致。多个流join之后,就变成了一个流。

union的应用场景:多种数据源的数据类型一致,数据处理规则也一致。

union算子案例:

1).scala实现:

package com.simoniu.flink.scala.stream.transformation

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * 合并多个流,多个流的数据类型必须一致
 * 应用场景:多种数据源的数据类型一致,数据处理规则也一致
 * Created by simoniu
 */
object StreamUnionScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //第1份数据流
    val text1 = env.fromCollection(Array("java", "c", "html", "python", "c#"))
    //第2份数据流
    val text2 = env.fromCollection(Array("javascript", "go", "linux", "sql", "rust"))

    //合并流
    val unionStream = text1.union(text2)
    //打印流中的数据
    unionStream.print().setParallelism(1)
    env.execute("StreamUnionScala")
  }
}

运行结果:

java
c
html
python
c#
javascript
go
linux
sql
rust

2).java实现:

package com.simoniu.flink.java.stream.transformation;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * 合并多个流,多个流的数据类型必须一致
 * 应用场景:多种数据源的数据类型一致,数据处理规则也一致
 * Created by simoniu
 */
public class StreamUnionJavaDemo {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //第1份数据流
        DataStreamSource<String> text1 = env.fromCollection(Arrays.asList("java", "c", "html", "python", "c#"));
        //第2份数据流
        DataStreamSource<String> text2 = env.fromCollection(Arrays.asList("javascript", "go", "linux", "sql", "rust"));
        //合并流
        DataStream<String> unionStream = text1.union(text2);
        //打印流中的数据
        unionStream.print().setParallelism(1);
        env.execute("StreamUnionJava");
    }
}

connect:只能连接两个流,两个流的数据类型可以不同。

connect算子案例:

1).scala实现:

package com.simoniu.flink.scala.stream.transformation

import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * 只能连接两个流,两个流的数据类型可以不同
 * 应用:可以将两种不同格式的数据统一成一种格式
 * Created by simoniu
 */
object StreamConnectScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //第1份数据流,属性使用空格分隔
    val text1 = env.fromElements("name:tom gender:male age:18")
    //第2份数据流,属性使用下划线分隔
    val text2 = env.fromElements("name:jack_age:20")

    //连接两个流
    val connectStream = text1.connect(text2)
    connectStream.map(new CoMapFunction[String, String, String] {
      //处理第1份数据流中的数据
      override def map1(value: String): String = {
        value.replace(" ", ",")
      }
      //处理第2份数据流中的数据
      override def map2(value: String): String = {
        value.replace("_", ",")
      }
    }).print().setParallelism(1)
    env.execute("StreamConnectScala")
  }
}

运行结果:

name:tom,gender:male,age:18
name:jack,age:20

2).java实现:

package com.simoniu.flink.java.stream.transformation;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;

/**
 * 只能连接两个流,两个流的数据类型可以不同
 * Created by simoniu
 */
public class StreamConnectJavaDemo {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //第1份数据流
        DataStreamSource<String> text1 = env.fromElements("name:tom gender:male age:18");
        //第2份数据流
        DataStreamSource<String> text2 = env.fromElements("name:jack_age:20");
        //连接两个流
        ConnectedStreams<String, String> connectStream = text1.connect(text2);
        connectStream.map(new CoMapFunction<String, String, String>() {
            //处理第1份数据流中的数据
            @Override
            public String map1(String value) throws Exception {
                return value.replace(" ",",");
            }
            //处理第2份数据流中的数据
            @Override
            public String map2(String value) throws Exception {
                return value.replace("_",",");
            }
        }).print().setParallelism(1);
        env.execute("StreamConnectJava");
    }
}

split:根据规则把一个数据流切分为多个流。注意:split只能分一次流,切分出来的流不能继续分流。

split需要和select配合使用,选择切分后的流。应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑。

split算子案例:

1).scala实现:

package com.simoniu.flink.scala.stream.transformation

import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * 根据规则把一个数据流切分为多个流
 * 注意:split只能分一次流,切分出来的流不能继续分流
 * split需要和select配合使用,选择切分后的流
 * 应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
 * Created by simoniu
 */
object StreamSplitScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

    //按照数据的奇偶性对数据进行分流
    val splitStream = text.split(new OutputSelector[Int] {
      override def select(value: Int): lang.Iterable[String] = {
        val list = new util.ArrayList[String]()
        if(value % 2 == 0){
          list.add("even")//偶数
        }else{
          list.add("odd")//奇数
        }
        list
      }
    })

    //选择流
    val evenStream = splitStream.select("even")
    evenStream.print().setParallelism(1)

    //二次切分流会报错
    //Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs
    /*val lowHighStream = evenStream.split(new OutputSelector[Int] {
      override def select(value: Int): lang.Iterable[String] = {
        val list = new util.ArrayList[String]()
        if(value <= 5){
          list.add("low");
        }else{
          list.add("high")
        }
        list
      }
    })
    val lowStream = lowHighStream.select("low")
    lowStream.print().setParallelism(1)*/
    env.execute("StreamSplitScala")
  }
}

运行结果:

2
4
6
8
10

2).java实现

package com.simoniu.flink.java.stream.transformation;

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.Arrays;

/**
 * 根据规则把一个数据流切分为多个流
 * 注意:split只能分一次流,切分出来的流不能继续分流
 * split需要和select配合使用,选择切分后的流
 * 应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
 * Created by simoniu
 */
public class StreamSplitJavaDemo {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        //按照数据的奇偶性对数据进行分流
        SplitStream<Integer> splitStream = text.split(new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                ArrayList<String> list = new ArrayList<>();
                if (value % 2 == 0) {
                    list.add("even");//偶数
                } else {
                    list.add("odd");//奇数
                }
                return list;
            }
        });
        //选择流
        DataStream<Integer> evenStream = splitStream.select("even");
        evenStream.print().setParallelism(1);
        env.execute("StreamSplitJava");
    }
}

目前split切分的流无法进行二次切分,并且split方法已经标记为过时了,官方不推荐使用,现在官方推荐使用side output的方式实现。

side output实现流的多次切分案例:

1).scala实现

package com.simoniu.flink.scala.stream.transformation

import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

/**
 * 使用sideoutput切分流
 * Created by simoniu
 */
object StreamSideOutputScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))


    //按照数据的奇偶性对数据进行分流
    //首先定义两个sideoutput来准备保存切分出来的数据
    val outputTag1 = new OutputTag[Int]("even") {} //保存偶数
    val outputTag2 = new OutputTag[Int]("odd") {} //保存奇数
    //注意:process属于Flink中的低级api
    val outputStream = text.process(new ProcessFunction[Int, Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
        if (value % 2 == 0) {
          ctx.output(outputTag1, value)
        } else {
          ctx.output(outputTag2, value)
        }
      }
    })

    //获取偶数数据流
    val evenStream = outputStream.getSideOutput(outputTag1)
    //获取奇数数据流
    val oddStream = outputStream.getSideOutput(outputTag2)
    //evenStream.print().setParallelism(1)

    //对evenStream流进行二次切分
    val outputTag11 = new OutputTag[Int]("low") {} //保存小于等五5的数字
    val outputTag12 = new OutputTag[Int]("high") {} //保存大于5的数字

    val subOutputStream = evenStream.process(new ProcessFunction[Int, Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
        if (value <= 5) {
          ctx.output(outputTag11, value)
        } else {
          ctx.output(outputTag12, value)
        }
      }
    })
    //获取小于等于5的数据流
    val lowStream = subOutputStream.getSideOutput(outputTag11)
    //获取大于5的数据流
    val highStream = subOutputStream.getSideOutput(outputTag12)

    lowStream.print("low=>").setParallelism(1)

    highStream.print("high=>").setParallelism(1)

    env.execute("StreamSideOutputScala")
  }
}

运行结果:

low=>> 2
high=>> 8
low=>> 4
high=>> 6
high=>> 10

2).java实现

package com.simoniu.flink.java.stream.transformation;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Arrays;

/**
 * 使用sideoutput切分流
 * Created by simoniu
 */
public class StreamSideoutputJavaDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

        //按照数据的奇偶性对数据进行分流
        //首先定义两个sideoutput来准备保存切分出来的数据
        OutputTag<Integer> outputTag1 = new OutputTag<Integer>("even") {
        };
        OutputTag<Integer> outputTag2 = new OutputTag<Integer>("odd") {
        };

        SingleOutputStreamOperator<Integer> outputStream = text.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out)
                    throws Exception {
                if (value % 2 == 0) {
                    ctx.output(outputTag1, value);
                } else {
                    ctx.output(outputTag2, value);
                }
            }
        });

        //获取偶数数据流
        DataStream<Integer> evenStream = outputStream.getSideOutput(outputTag1);
        //获取奇数数据流
        DataStream<Integer> oddStream = outputStream.getSideOutput(outputTag2);

        //对evenStream流进行二次切分
        OutputTag<Integer> outputTag11 = new OutputTag<Integer>("low") {
        };
        OutputTag<Integer> outputTag12 = new OutputTag<Integer>("high") {
        };
        SingleOutputStreamOperator<Integer> subOutputStream = evenStream.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out)
                    throws Exception {
                if (value <= 5) {
                    ctx.output(outputTag11, value);
                } else {
                    ctx.output(outputTag12, value);
                }
            }
        });

        //获取小于等于5的数据流
        DataStream<Integer> lowStream = subOutputStream.getSideOutput(outputTag11);
        //获取大于5的数据流
        DataStream<Integer> highStream = subOutputStream.getSideOutput(outputTag12);

        lowStream.print("low=>").setParallelism(1);
        highStream.print("high=>").setParallelism(1);

        env.execute("StreamSideoutputJava");

    }
}

其实想要实现多级流切分,使用filter算子也可以实现。

以上算子总结如下:

union可以连接多个流,最后汇总成一个流,流里面的数据使用相同的计算规则 connect值可以连接2个流,最后汇总成一个流,但是流里面的两份数据相互还是独立的,每一份数据使用一个计算规则。

如果是只需要切分一次的话使用split或者side output都可以,如果想要切分多次,就不能使用split了,需要使用side output。