TimeWindow的使用。
TimeWindow 是根据时间对数据流切分窗口,TimeWindow可以支持滚动窗口和滑动窗口。
其中timeWindow( Time.seconds(10))方法表示滚动窗口的窗口大小为10秒,对每10秒内的数据进行聚合计算 timeWindow(Time.seconds(10),Time.seconds(5))方法表示滑动窗口的窗口大小为10秒,滑动间隔为5秒。就是每隔5秒计算前10秒内的数据。
TimeWindow案例:
1).scala实现
package com.simoniu.flink.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* TimeWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by simoniu
*/
object TimeWindowOpScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("master", 9001)
import org.apache.flink.api.scala._
//TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
/*text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//窗口大小
.timeWindow(Time.seconds(10))
.sum(1).print()*/
//TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1).print()
env.execute("TimeWindowOpScala")
}
}
2).java实现
package com.simoniu.flink.java.window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* TimeWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by simoniu
*/
public class TimeWindowOpJavaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("master", 9001);
//TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
/*text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.timeWindow(Time.seconds(10))
.sum(1)
.print();*/
//TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
.print();
env.execute("TimeWindowOpJava");
}
}