flink统计文本单词数量

📅 2026/6/26 2:20:18
flink统计文本单词数量
1.flink统计文本单词数量package com.ycl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountDemo { public static void main(String[] args) throws Exception { //1.创建执行环境 ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment(); //2.读取数据,从文件中读取 DataSourceString lineDS env.readTextFile(input/word.txt); //3.切分转换(word,1),匿名类 //AltEnter跳出如下方法的全部内容。 FlatMapOperatorString, Tuple2String, Integer wordAndOne lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() { Override public void flatMap(String value, CollectorTuple2String, Integer out) throws Exception { //3.1 按照空格切分单词 String[] words value.split( ); //3.2将单词转换为(word,1) ,点击.var 会补全所有的行。 for (String word : words) { Tuple2String, Integer wordTuple2 Tuple2.of(word, 1); //3.3 使用 Collector 向下游发送数据; out.collect(wordTuple2); } } }); //4.按照 word分组 UnsortedGroupingTuple2String, Integer wordAndOneGroupBy wordAndOne.groupBy(0); //5.各分组内聚合 1是位置,表示第二个元素; AggregateOperatorTuple2String, Integer sum wordAndOneGroupBy.sum(1); //6.输出 sum.print(); } }输出结果如下