当前位置: 首页> 科技> 名企 > 深圳防疫最新情况实时_app制作多少钱一个_google官网下载安装_什么叫关键词举例

深圳防疫最新情况实时_app制作多少钱一个_google官网下载安装_什么叫关键词举例

时间:2025/7/30 11:10:08来源:https://blog.csdn.net/weixin_63297999/article/details/144036550 浏览次数:0次
深圳防疫最新情况实时_app制作多少钱一个_google官网下载安装_什么叫关键词举例

map算子的使用

假如有如下数据:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

问题:将其转换为一个LogBean对象,并输出。

读取本地文件,使用如下方式

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");

字段名定义为:

        String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径

 假如需要用到日期工具类,可以导入lang3包

        <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency>

代码如下: 

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

FlatMap算子的使用练习

flatmap的作用是将DataStream中的每一个元素转换为0...n个元素

读取flatmap.log文件中的数据:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

将数据转换为:

张三有苹果手机
张三有联想电脑
张三有华为平板
李四有…
…
…

 代码如下:

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

 

Filter的使用

对数据进行过滤。

读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志

代码演示如下:

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class Demo06 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换//读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志streamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];return ip.equals("83.149.9.216");}}).writeAsText("datas/b.log", FileSystem.WriteMode.OVERWRITE).setParallelism(1);//4. sink-数据输出//5. execute-执行env.execute();}
}

KeyBy

对数据进行分组,分组后的数据进入同一个分区。

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
wordAndOne.keyBy(v -> v.f0)

 多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

类似于sql中的group by:

select sex,count(1) from student group by sex;
group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组

 POJO

类似下面的类就是pojo

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy:

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

例如:

假如有如下数据:
env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));
求:篮球多少个,足球多少个?

代码演示:

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07 {@Data@AllArgsConstructorstatic class Ball{private String ballName;private int num;}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// 这个写法已经废弃,0 代表的是按照元组的第一个元素进行分组,相同的组进入到相同的编号中KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = tuple2DataStreamSource.keyBy(0);tuple2TupleKeyedStream.print();// 这个写法是目前提倡的写法// 使用了lambda表达式,因为这个算子后面不需要写returns 所以看着比较简介tuple2DataStreamSource.keyBy(v -> v.f0).print();// 这个是原始写法,没有简化tuple2DataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});// 以上的写法是针对数据是二元组的格式,以下演示数据是pojoDataStreamSource<Ball> ballSource = env.fromElements(new Ball("篮球", 1),new Ball("篮球", 2),new Ball("篮球", 3),new Ball("足球", 3),new Ball("足球", 2),new Ball("足球", 3));ballSource.keyBy(ball -> ball.getBallName()).print();ballSource.keyBy(new KeySelector<Ball, String>() {@Overridepublic String getKey(Ball ball) throws Exception {return ball.getBallName();}});//5. execute-执行env.execute();}
}

Reduce

--sum的底层是reduce

可以对一个dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素

读取a.log日志,统计ip地址访问pv数量,使用reduce 操作聚合成一个最终结果

结果类似:

(86.149.9.216,1)

(10.0.0.1,7)

(83.149.9.216,6)

代码演示:

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class Demo08 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换KeyedStream<Tuple2<String, Integer>, String> keyBy = streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(" ");return Tuple2.of(arr[0], 1);}}).keyBy(v -> v.f0);// 不使用reduce的情况,本质上sum的底层是agg,agg的底层是reduce//keyBy.sum(1).print();// 将相同的IP 已经放入到了同一个组中,接着就开始汇总了。keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() {// 第一个v1 代表汇总过的二元组,第二个v2 ,代表 当前分组中的一个二元组@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {return Tuple2.of(v1.f0,v1.f1 + v2.f1);}}).print();// 简化版keyBy.reduce(( v1, v2) -> Tuple2.of(v1.f0,v1.f1 + v2.f1)).print();//5. execute-执行env.execute();}
}

Union和connect-合并和连接

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

【注意】:union合并的DataStream的类型必须是一致的

注意:union可以取并集,但是不会去重

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

 Side Outputs

侧道输出(侧输出流) --可以分流

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
代码演示:

package com.bigdata.day02;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Demo11 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));//2. source-加载数据SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value 代表每一个数据if (value % 2 == 0) {ctx.output(tag_even, value);} else {ctx.output(tag_odd, value);}}});// 从数据集中获取奇数的所有数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

关键字:深圳防疫最新情况实时_app制作多少钱一个_google官网下载安装_什么叫关键词举例

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: