pom.xml
<properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>
普通版本+匿名内部类
package com.bigdata;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount_flink_01 {public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 1.这个是 自动 ,根据流的性质,决定是批处理还是流处理* env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);* 2.批处理流, 一口气把数据算出来* env.setRuntimeMode(RuntimeExecutionMode.BATCH);* 3.默认是流* env.setRuntimeMode(RuntimeExecutionMode.STREAMING);*/// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
// env.setRuntimeMode(RuntimeExecutionMode.BATCH); 批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 设置分区数 也可以在每个方法后面设置一遍env.setParallelism(2);// 加载数据DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");//转换数据SingleOutputStreamOperator<Tuple2<String, Integer>> rsDs = sourceDs.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {// 按照某一列进行分组return tuple2.f0;}}).sum(1);// 不能使用sout 若是jar包格式会在8081端口中相关页面打印rsDs.print();env.execute("单词统计案例");}
}
lambda表达式
package com.bigdata;import com.sun.org.apache.xalan.internal.xsltc.compiler.util.Type;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount_flink_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");/** lambda 表达式的使用* 1、函数式接口(一个接口中只有一个未实现的方法)* 2、写法 ()->{} * 3、最后一句可以当成返回值** 使用dataStream时,需要returns 就比较烦了 而有的方法就不需要*/sourceDs.flatMap((String s, Collector<String> collector)->{String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}).returns(Types.STRING).map((String word)->Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy((Tuple2<String, Integer> tuple2)->tuple2.f0).sum(1).print();env.execute("单词统计案例");}
}
外部传参
package com.bigdata;/*** 使用args 直接进行传参**/
public class WordCount_flink_04 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取所有传入的参数MultipleParameterTool params = MultipleParameterTool.fromArgs(args);env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<String> sourceDs = null;// 假如说直接args时,通过长度进行判断 args[0] 获取即可// 加载数据if (params.has("input")){sourceDs = env.readTextFile(params.get("input"));}else {sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");}//转换数据SingleOutputStreamOperator<Tuple2<String, Integer>> resultDf = sourceDs.flatMap((String s, Collector<String> collector) -> {String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}).returns(Types.STRING).map((String word) ->Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0).sum(1);// 另一个参数的使用if (params.has("output")){resultDf.writeAsText(params.get("output"), // 设置最后文件个数为1 此时就不会生成文件夹了FileSystem.WriteMode.OVERWRITE).setParallelism(1);}else {resultDf.print();}env.execute("单词统计案例--input");}
}
使用命令运行flink的jar包
在集群上运行jar包(传参)
方式一:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
--input /xx --output /xx/xx/x
方式二:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
/xx/xx/xx /xx/xxx-c 指定main方法