文章目录
- 一. 需求描述
- 二. 方案思路
- 1. 解决思路
- 2. flink json 解析
- 2.1. 通过json path解析非array数据
- 2.2. 通过json path解析array数据
- 3. CROSS JOIN逻辑
- 三. 方案实现
- 1. http json数据样例
- 2. flink sql 说明
一. 需求描述
flink消费http接口的数据,将json中的数组展开多行
如下样例数据以及要求处理的数据效果
{ "name": "John Doe", "age": 30, "address": { "street": { "street": "123 Main St", "city": "New York", "state": "NY" }, "city": "New York", "state": "NY" }, "phoneNumbers": [ { "type": "home", "number": "212-555-1234" }, { "type": "fax", "number": "646-555-4567" } ], "children": [], "spouse": null
}
name | age | street | city | state | phone_type | phone_number |
---|---|---|---|---|---|---|
John Doe | 30 | 123 Main St | New York | NY | home | 212-555-1234 |
John Doe | 30 | 123 Main St | New York | NY | fax | 646-555-4567 |
二. 方案思路
1. 解决思路
- flink 消费http接口的数据(json),发送到下游
- 下游算子解析json数据,当遇到数组时,算子解析返回array
- 通过使用CROSS JOIN 将数组数据拍平,如上表格展现
2. flink json 解析
2.1. 通过json path解析非array数据
如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。
这里使用 cast转换,如下举例
cast(JSON_VALUE(json_string,'$.id') as int) as id ,
JSON_VALUE(json_string,'$.name') as name,
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,
JSON_VALUE(json_string,'$.details.address') as address,
2.2. 通过json path解析array数据
官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。
如下:
<dependencies> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
通过udf解决
package com.dtstack.chunjun.local.test; import com.jayway.jsonpath.JsonPath;
import org.apache.flink.table.functions.ScalarFunction; import java.util.ArrayList;
import java.util.List; public class JsonArrayFieldExtractor extends ScalarFunction { public List<String> eval(String jsonString, String jsonPath) { if (jsonString == null || jsonString.isEmpty()) { return new ArrayList<String>(); } try { List<?> result = JsonPath.read(jsonString, jsonPath); List<String> stringList = new ArrayList<>(); for (Object obj : result) { stringList.add(obj.toString()); } return stringList; } catch (Exception e) { return new ArrayList<String>(); } } }
3. CROSS JOIN逻辑
Array Expansion
注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。
三. 方案实现
1. http json数据样例
{ "id": 1, "name": "Alice", "details": { "age": {"real":11}, "address": "123Mainst", "contacts": [ { "type": "email", "value": "alice@example.com" }, { "type": "phone", "value": "123-456-7890" } ], "grade": [ { "grade": [{"zz":11},{"zz":11}], "bb": {"rr":{"yy":"alice@example.com"}} }, { "grade": [{"zz":22}], "bb": {"rr":{"yy":"alice@example.com"}} } ] }
}
2. flink sql 说明
CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';CREATE TABLE source
(json_string varchar
) WITH ('connector' = 'http-x','url' = 'http://localhost:8088/api/arraypage','intervalTime'= '3000','method'='get' --请求方式:get 、post,'decode'='text' -- 数据格式:只支持json模式-- 以下4个参数要同时存在:,'page-param-name'='pagenum' -- 多次请求参数1:分页参数名:例如:pageNum,'start-index'='1' -- 多次请求参数2:开始的位置,'end-index'='4' -- 多次请求参数3:结束的位置,'step'='1' -- 多次请求参数4:步长:默认值为1);CREATE TABLE sink
(id int,name varchar,`real` int,address varchar,zz int,yy varchar
) WITH ('connector' = 'print');insert into sink SELECTcast(JSON_VALUE(json_string,'$.id') as int) as id ,JSON_VALUE(json_string,'$.name') as name,cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,JSON_VALUE(json_string,'$.details.address') as address,cast(`$.grade[*].grade[*].zz` as int ) as zz,`$.details.grade[*].bb.rr.yy` as yyFROM sourceCROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy' )) AS T1(`$.details.grade[*].bb.rr.yy`);--{
-- "id": 1,
-- "name": "Alice",
-- "details": {
-- "age": {"real":11},
-- "address": "123Mainst",
-- "contacts": [
-- {
-- "type": "email",
-- "value": "alice@example.com"
-- },
-- {
-- "type": "phone",
-- "value": "123-456-7890"
-- }
-- ],
-- "grade": [
-- {
-- "grade": [{"zz":11},{"zz":11}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- },
-- {
-- "grade": [{"zz":22}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- }
-- ]
-- }
--}
消费结果
具体逻辑描述
-
http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
[feature-DTStack#1775][connector][http] http supports offline mode -
使用JSON_VALUE、get_json_array解析为string和
array<string>
,之后使用cast进行类型转换 -
CROSS JOIN 生成笛卡尔积