← 返回首页
自定义Window的使用
发表时间:2023-11-30 15:07:51
自定义Window的使用

自定义Window的使用。

1.自定义window

其实window还可以再细分一下,分为以下两种:

咱们前面演示的都是基于key的window,就是在使用window之前,先执行了keyBy分组操作,如果需求中不需要根据key进行分组的话,可以不使用keyBy,这样在使用window的时候需要使用timeWindowAll()和countWindowAll()。

如果是自定义window的话该如何使用呢?

针对基于key的window需要使用window函数。 针对不基于key的window需要使用windowAll函数。

其实我们前面所说的TimeWindow和TimeWindowAll底层用的就是window和windowAll函数,可以这样理解,TimeWindow是官方封装好的window。

自己使用window函数封装一个MyTimeWindow案例。

1).scala实现

package com.simoniu.flink.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 需求:自定义MyTimeWindow
 * Created by simoniu
 */
object MyTimeWindowScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("master", 9001)

    import org.apache.flink.api.scala._
    //自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
    text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //窗口大小
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .sum(1).print()

    env.execute("MyTimeWindowScala")
  }
}

2).java实现

package com.simoniu.flink.java.window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 需求:自定义MyTimeWindow
 * Created by simoniu
 */
public class MyTimeWindowScalaDemo {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("master", 9001);

        //自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<String, Integer>(word,1));
                        }
                    }
                }).keyBy(0)
                //窗口大小
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum(1)
                .print();

        env.execute("MyTimeWindowScala");
    }
}