← 返回首页
CountWindow的使用
发表时间:2023-11-29 14:50:40
CountWindow的使用

CountWindow的使用。

1.CountWindow

CountWindow是根据元素个数对数据流切分窗口,CountWindow也可以支持滚动窗口和滑动窗口。

CountWindow案例:

1)scala实现

package com.simoniu.flink.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * CountWindow的使用
 * 1:滚动窗口
 * 2:滑动窗口
 * Created by simoniu
 */
object CountWindowOpScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("master", 9001)

    import org.apache.flink.api.scala._

    /**
     * 注意:由于我们在这里使用了keyBy,会先对数据分组
     * 如果某个分组对应的数据窗口内达到了5个元素,这个窗口才会被触发执行
     */
    //CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
    /*text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //指定窗口大小
      .countWindow(5)
      .sum(1).print()*/

    //CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
    text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //第一个参数:窗口大小,第二个参数:滑动间隔
      .countWindow(5,1)
      .sum(1).print()

    env.execute("CountWindowOpScala")
  }
}

通过socket输入测试数据:

[root@master ~]# nc -l 9001
hello you
hello me
hello hello hello
you you you you
hello
you

输出结果:

3> (hello,5)
5> (you,5)

执行CountWindow之滑动窗口的代码,输入结果如下:

5> (you,1)
3> (hello,1)
4> (me,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
3> (hello,5)
5> (you,2)
5> (you,3)
5> (you,4)
5> (you,5)
3> (hello,5)
5> (you,5)

2).java实现

package com.simoniu.flink.java.window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * CountWindow的使用
 * 1:滚动窗口
 * 2:滑动窗口
 * Created by simoniu
 */
public class CountWindowOpJavaDemo {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("master", 9001);

        //CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<String, Integer>(word,1));
                        }
                    }
                }).keyBy(0)
                //窗口大小
                .countWindow(5)
                .sum(1)
                .print();

        //CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<String, Integer>(word,1));
                        }
                    }
                }).keyBy(0)
                //第一个参数:窗口大小,第二个参数:滑动间隔
                .countWindow(5,1)
                .sum(1)
                .print();

        env.execute("CountWindowOpJava");
    }
}