← 返回首页
TimeWindow的使用
发表时间:2023-11-29 14:26:06
TimeWindow的使用

TimeWindow的使用。

1.TimeWindow

TimeWindow 是根据时间对数据流切分窗口,TimeWindow可以支持滚动窗口和滑动窗口。

其中timeWindow( Time.seconds(10))方法表示滚动窗口的窗口大小为10秒,对每10秒内的数据进行聚合计算 timeWindow(Time.seconds(10),Time.seconds(5))方法表示滑动窗口的窗口大小为10秒,滑动间隔为5秒。就是每隔5秒计算前10秒内的数据。

TimeWindow案例:

1).scala实现

package com.simoniu.flink.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * TimeWindow的使用
 * 1:滚动窗口
 * 2:滑动窗口
 * Created by simoniu
 */
object TimeWindowOpScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("master", 9001)

    import org.apache.flink.api.scala._
    //TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
    /*text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //窗口大小
      .timeWindow(Time.seconds(10))
      .sum(1).print()*/

    //TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
    text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //第一个参数:窗口大小,第二个参数:滑动间隔
      .timeWindow(Time.seconds(10),Time.seconds(5))
      .sum(1).print()

    env.execute("TimeWindowOpScala")
  }
}

2).java实现

package com.simoniu.flink.java.window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * TimeWindow的使用
 * 1:滚动窗口
 * 2:滑动窗口
 * Created by simoniu
 */
public class TimeWindowOpJavaDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("master", 9001);

        //TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
        /*text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                    throws Exception {
                String[] words = line.split(" ");
                for (String word: words) {
                    out.collect(new Tuple2<String, Integer>(word,1));
                }
            }
        }).keyBy(0)
                //窗口大小
                .timeWindow(Time.seconds(10))
                .sum(1)
                .print();*/

        //TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }
                    }
                }).keyBy(0)
                //第一个参数:窗口大小,第二个参数:滑动间隔
                .timeWindow(Time.seconds(10), Time.seconds(5))
                .sum(1)
                .print();

        env.execute("TimeWindowOpJava");
    }
}