CountWindow的使用。
CountWindow是根据元素个数对数据流切分窗口,CountWindow也可以支持滚动窗口和滑动窗口。
CountWindow案例:
1)scala实现
package com.simoniu.flink.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* CountWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by simoniu
*/
object CountWindowOpScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("master", 9001)
import org.apache.flink.api.scala._
/**
* 注意:由于我们在这里使用了keyBy,会先对数据分组
* 如果某个分组对应的数据窗口内达到了5个元素,这个窗口才会被触发执行
*/
//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
/*text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//指定窗口大小
.countWindow(5)
.sum(1).print()*/
//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5,1)
.sum(1).print()
env.execute("CountWindowOpScala")
}
}
通过socket输入测试数据:
[root@master ~]# nc -l 9001
hello you
hello me
hello hello hello
you you you you
hello
you
输出结果:
3> (hello,5)
5> (you,5)
执行CountWindow之滑动窗口的代码,输入结果如下:
5> (you,1)
3> (hello,1)
4> (me,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
3> (hello,5)
5> (you,2)
5> (you,3)
5> (you,4)
5> (you,5)
3> (hello,5)
5> (you,5)
2).java实现
package com.simoniu.flink.java.window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* CountWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by simoniu
*/
public class CountWindowOpJavaDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("master", 9001);
//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.countWindow(5)
.sum(1)
.print();
//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5,1)
.sum(1)
.print();
env.execute("CountWindowOpJava");
}
}