本文为您介绍Flink全托管支持的Queries语句详情。
Flink全托管兼容Apache Flink的Queries语句。以下BNF-grammar描述了支持的流批SQL特性的超集。
query:values| WITH withItem [ , withItem ]* query| {select| selectWithoutFrom| query UNION [ ALL ] query| query EXCEPT query| query INTERSECT query}[ ORDER BY orderItem [, orderItem ]* ][ LIMIT { count | ALL } ][ OFFSET start { ROW | ROWS } ][ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]withItem:name[ '(' column [, column ]* ')' ]AS '(' query ')'orderItem:expression [ ASC | DESC ]select:SELECT [ ALL | DISTINCT ]{ * | projectItem [, projectItem ]* }FROM tableExpression[ WHERE booleanExpression ][ GROUP BY { groupItem [, groupItem ]* } ][ HAVING booleanExpression ][ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom:SELECT [ ALL | DISTINCT ]{ * | projectItem [, projectItem ]* }projectItem:expression [ [ AS ] columnAlias ]| tableAlias . *tableExpression:tableReference [, tableReference ]*| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]joinCondition:ON booleanExpression| USING '(' column [, column ]* ')'tableReference:tablePrimary[ matchRecognize ][ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary:[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'| [ LATERAL ] '(' query ')'| UNNEST '(' expression ')'tablePath:[ [ catalogName . ] databaseName . ] tableNamesystemTimePeriod:FOR SYSTEM_TIME AS OF dateTimeExpressiondynamicTableOptions:/*+ OPTIONS(key=val [, key=val]*) */key:stringLiteralval:stringLiteralvalues:VALUES expression [, expression ]*groupItem:expression| '(' ')'| '(' expression [, expression ]* ')'| CUBE '(' expression [, expression ]* ')'| ROLLUP '(' expression [, expression ]* ')'| GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef:windowName| windowSpecwindowSpec:[ windowName ]'('[ ORDER BY orderItem [, orderItem ]* ][ PARTITION BY expression [, expression ]* ][RANGE numericOrIntervalExpression {PRECEDING}| ROWS numericExpression {PRECEDING}]')'matchRecognize:MATCH_RECOGNIZE '('[ PARTITION BY expression [, expression ]* ][ ORDER BY orderItem [, orderItem ]* ][ MEASURES measureColumn [, measureColumn ]* ][ ONE ROW PER MATCH ][ AFTER MATCH( SKIP TO NEXT ROW| SKIP PAST LAST ROW| SKIP TO FIRST variable| SKIP TO LAST variable| SKIP TO variable )]PATTERN '(' pattern ')'[ WITHIN intervalLiteral ]DEFINE variable AS condition [, variable AS condition ]*')'measureColumn:expression AS aliaspattern:patternTerm [ '|' patternTerm ]*patternTerm:patternFactor [ patternFactor ]*patternFactor:variable [ patternQuantifier ]patternQuantifier:'*'| '*?'| '+'| '+?'| '?'| '??'| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']| '{' repeat '}'
标识符
对于标识符(表名,列名,函数名),Flink 采用了和Java相似的语法策略:
-
不管标识符是否被反引号标识,该标识符是大小写敏感的。
-
标识符的匹配是大小写敏感的。
和Java不同的是,Flink SQL支持标识符包含非英文或数字的字符,例如,以下是符合标准的。
SELECT a AS `my field` FROM t
字符串常量
Flink SQL使用单引号来表示字符串常量,而非使用双引号来表示,例如:
SELECT 'Hello World'
为了在字符串表示单引号,您可以使用两个单引号来转义。例如:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Flink SQL支持在字符串常量中包含unicode值,您可以通过以下方式声明:
-
使用反斜杠作为默认转义符
SELECT U&'\263A'
-
使用自定义符号作为转义符
SELECT U&'#263A' UESCAPE '#' -- 使用'#'作为转义符
Apache Flink V1.15 Queries语句详情如下表所示。
说明
如果您需要查看其它版本Queries语句,请注意切换到对应版本。
Queries语句 | 相关文档 |
Hints | SQL Hints |
WITH子句 | WITH clause |
SELECT与WHERE子句 | SELECT & WHERE clause |
SELECT DISTINCT | SELECT DISTINCT |
窗口函数 | Windowing table-valued functions (Windowing TVFs) |
窗口聚合 | Window Aggregation |
分组聚合 | Group Aggregation |
Over聚合 | Over Aggregation |
Join | Joins |
窗口关联 | Window Join |
集合操作 | Set Operations |
ORDER BY语句 | ORDER BY clause |
LIMIT语句 | LIMIT clause |
Top-N | Top-N |
窗口Top-N | Window Top-N |
去重 | Deduplication |
窗口去重 | Window Deduplication |
模式检测 | Pattern Recognition |
Query操作运行时信息说明
在流模式下,我们根据是否包含更新消息将处理的流数据分为更新流(包含更新消息)和非更新流(只包含INSERT类型消息的称为非更新流),例如CDC源就是Flink集成自外部的更新流,另外Query内部的一些操作也可能产生更新数据,如分组聚合(Group Aggregation)、Top-N计算等。能产生更新事件的操作通常会使用状态(State),我们一般将这类操作称为状态算子。值得注意的是,并非所有的状态算子都支持处理更新流,例如,Over聚合(Over Aggregation)和Interval Join目前还不支持将更新流作为输入。
以下表格信息基于VVR-6.0.x 及以上版本整理,包括了Query操作对应的运行时算子名称、算子是否使用了状态(State)、是否支持处理更新流、是否产生更新。
Query操作 | 对应运行时算子名称 | 是否使用状态(State) | 是否支持更新流 | 是否产生更新 | 说明 |
SELECT与WHERE | Calc | 否 | 是 | 否 | 无。 |
Lookup Join | LookupJoin | 否* | 是 | 否 | 在VVR-8.0.1及以上版本中设置作业参数‘table.optimizer.non-deterministic-update.strategy’为‘TRY_RESOLVE’且引擎检测到当前作业存在非确定性更新风险时,会自动启用状态(State)来消除非确定性,可以通过设置该参数为'IGNORE'强制关闭使用状态,注意修改该参数改变算子是否使用状态时,会导致作业状态不兼容,需要无状态启动作业。 |
Table Function | Correlate | 否 | 是 | 否 | 无。 |
SELECT DISTINCT | GroupAggregate | 是 | 是 | 是 | 无。 |
分组聚合(Group Aggregation) | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | 是* | 是 | 是 | LocalGroupAggregate预聚合算子不会使用状态(State)。 |
Over聚合(Over Aggregation) | OverAggregate | 是 | 否 | 否 | 无。 |
窗口聚合(Window Aggregation) | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | 是* | 是* | 否* |
|
双流Join(Regular Join) | Join | 是 | 是 | 是* | 当使用外连接类型时,例如LEFT、RIGHT、FULL OUTER Join会产生更新。 |
Interval Join | IntervalJoin | 是 | 否 | 否 | 无。 |
Temporal Join | TemporalJoin | 是 | 是 | 否 | 无。 |
窗口关联(Window Join) | WindowJoin | 是 | 否 | 否 | 无。 |
Top-N | Rank | 是 | 是 | 是 | Top-N不支持使用Processing Time字段作为排序键之一,请使用CURRENT_TIMESTAMP等其他内置函数进行排序。 警告 使用Processing Time字段作为Top-N的排序键之一会有数据错误问题。实时计算引擎VVR 8.0.7及以前版本语法检测不会报错,请您使用CURRENT_TIMESTAMP等其他内置函数替代。 |
窗口Top-N | WindowRank | 是 | 否 | 否 | 无。 |
去重(Deduplication) | Deduplicate | 是 | 否 | 是* | 基于处理时间(Proctime)使用first row去重时不会产生更新。 |
窗口去重(Window Deduplication) | WindowDeduplicate | 是 | 否 | 否 | 无。 |
说明
非状态算子仅会透传消息类型,并不会主动产生更新消息,即输出的消息类型和输入的消息类型保持一致;产生更新是指当输入为非更新流时也可能产生更新消息。