自定义Window的使用。
其实window还可以再细分一下,分为以下两种:
咱们前面演示的都是基于key的window,就是在使用window之前,先执行了keyBy分组操作,如果需求中不需要根据key进行分组的话,可以不使用keyBy,这样在使用window的时候需要使用timeWindowAll()和countWindowAll()。
如果是自定义window的话该如何使用呢?

针对基于key的window需要使用window函数。 针对不基于key的window需要使用windowAll函数。
其实我们前面所说的TimeWindow和TimeWindowAll底层用的就是window和windowAll函数,可以这样理解,TimeWindow是官方封装好的window。
自己使用window函数封装一个MyTimeWindow案例。
1).scala实现
package com.simoniu.flink.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 需求:自定义MyTimeWindow
* Created by simoniu
*/
object MyTimeWindowScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("master", 9001)
import org.apache.flink.api.scala._
//自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//窗口大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1).print()
env.execute("MyTimeWindowScala")
}
}
2).java实现
package com.simoniu.flink.java.window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 需求:自定义MyTimeWindow
* Created by simoniu
*/
public class MyTimeWindowScalaDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("master", 9001);
//自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
.print();
env.execute("MyTimeWindowScala");
}
}