当前位置: 首页> 教育> 培训 > 北京网站建设公司朝阳_网络营销岗位职责和任职要求_新手学百度竞价要多久_常德论坛网站

北京网站建设公司朝阳_网络营销岗位职责和任职要求_新手学百度竞价要多久_常德论坛网站

时间:2025/7/10 3:25:53来源:https://blog.csdn.net/weixin_52642840/article/details/143973692 浏览次数:0次
北京网站建设公司朝阳_网络营销岗位职责和任职要求_新手学百度竞价要多久_常德论坛网站

pom.xml

<properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>

普通版本+匿名内部类

package com.bigdata;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount_flink_01 {public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 1.这个是 自动 ,根据流的性质,决定是批处理还是流处理* env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);* 2.批处理流, 一口气把数据算出来* env.setRuntimeMode(RuntimeExecutionMode.BATCH);* 3.默认是流* env.setRuntimeMode(RuntimeExecutionMode.STREAMING);*/// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
//        env.setRuntimeMode(RuntimeExecutionMode.BATCH); 批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 设置分区数 也可以在每个方法后面设置一遍env.setParallelism(2);// 加载数据DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");//转换数据SingleOutputStreamOperator<Tuple2<String, Integer>> rsDs = sourceDs.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {// 按照某一列进行分组return tuple2.f0;}}).sum(1);// 不能使用sout 若是jar包格式会在8081端口中相关页面打印rsDs.print();env.execute("单词统计案例");}
}

lambda表达式

package com.bigdata;import com.sun.org.apache.xalan.internal.xsltc.compiler.util.Type;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount_flink_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");/** lambda 表达式的使用* 1、函数式接口(一个接口中只有一个未实现的方法)* 2、写法 ()->{} * 3、最后一句可以当成返回值** 使用dataStream时,需要returns 就比较烦了 而有的方法就不需要*/sourceDs.flatMap((String s, Collector<String> collector)->{String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}).returns(Types.STRING).map((String word)->Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy((Tuple2<String, Integer> tuple2)->tuple2.f0).sum(1).print();env.execute("单词统计案例");}
}

外部传参

package com.bigdata;/*** 使用args 直接进行传参**/
public class WordCount_flink_04 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取所有传入的参数MultipleParameterTool params = MultipleParameterTool.fromArgs(args);env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<String> sourceDs = null;// 假如说直接args时,通过长度进行判断  args[0] 获取即可// 加载数据if (params.has("input")){sourceDs = env.readTextFile(params.get("input"));}else {sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");}//转换数据SingleOutputStreamOperator<Tuple2<String, Integer>> resultDf = sourceDs.flatMap((String s, Collector<String> collector) -> {String[] words = s.split(" ");for (String word : words) {collector.collect(word);}}).returns(Types.STRING).map((String word) ->Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0).sum(1);// 另一个参数的使用if (params.has("output")){resultDf.writeAsText(params.get("output"),   // 设置最后文件个数为1 此时就不会生成文件夹了FileSystem.WriteMode.OVERWRITE).setParallelism(1);}else {resultDf.print();}env.execute("单词统计案例--input");}
}

使用命令运行flink的jar包

在集群上运行jar包(传参)
方式一:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 
--input /xx --output /xx/xx/x
方式二:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 
/xx/xx/xx /xx/xxx-c 指定main方法

关键字:北京网站建设公司朝阳_网络营销岗位职责和任职要求_新手学百度竞价要多久_常德论坛网站

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: