已解决
Flink SQL 常用作业sql
来自网友在路上 170870提问 提问时间:2023-11-05 17:45:21阅读次数: 70
最佳答案 问答题库708位专家为你答疑解惑
目录
- flink sql常用配置
- kafka source to mysql sink
- 窗口函数 开窗
- datagen 自动生成数据表
- tumble 滚动窗口
- hop 滑动窗口
- cumulate 累积窗口
- grouping sets 多维分析
- over 函数
- TopN
flink sql常用配置
设置输出结果格式
SET sql-client.execution.result-mode=tableau;
kafka source to mysql sink
kafka
topic: bop_log_realtime
数据结构:
{"timestamp":"2023-10-31 14:26:02.528","serverip":"10.13.177.209","level":"INFO","servicename":"bop-fms-query-info","traceid":"","spanid":"","parent":"","message":"Resolving eureka endpoints via configuration"}mysql表:
库名:flink_test
CREATE TABLE `bop_log_realtime_warning` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`serverip` varchar(255) NOT NULL DEFAULT '',`timestamp` varchar(255) NOT NULL DEFAULT '',`level` varchar(255) NOT NULL DEFAULT '',`servicename` varchar(255) NOT NULL DEFAULT '',`traceid` varchar(255) NOT NULL DEFAULT '',`spanid` varchar(255) NOT NULL DEFAULT '',`parent` varchar(255) NOT NULL DEFAULT '',`message` varchar(255) NOT NULL DEFAULT '',`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE kafka_log_realtime_json (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH ('connector' = 'kafka','topic' = 'bop_log_realtime','properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093','properties.group.id' = 'testGroup2','format' = 'json','scan.startup.mode' = 'latest-offset'
);CREATE TABLE bop_log_realtime_warning (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
,'username' = 'super_mis'
,'password' = 'mis_password'
,'table-name' = 'bop_log_realtime_warning'
);insert into bop_log_realtime_warning
SELECT`serverip` ,`timestamp` ,`level` ,`servicename` ,`traceid` ,`spanid` ,`parent` ,`message` FROM kafka_log_realtime_json;
窗口函数 开窗
datagen 自动生成数据表
CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.id.min' = '1','fields.id.max' = '3','fields.vc.min' = '1','fields.vc.max' = '100'
);
tumble 滚动窗口
滚动窗口 窗口大小5秒
selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;
hop 滑动窗口
滑动窗口 滑动步长5秒 窗口大小10秒
注意:窗口大小=滑动步长的整数倍(底层会优化成多个小滚动窗口)
selectid,sum(vc) vcSum,window_start,window_endfrom table(hop(table ws, descriptor(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))group by id, window_start, window_end;
cumulate 累积窗口
注意:窗口大小=累积步长的整数倍
selectid,sum(vc) vcSum,window_start,window_endfrom table(CUMULATE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;
grouping sets 多维分析
selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by window_start, window_end,grouping sets ( (id) );
over 函数
TopN
查看全文
99%的人还看了
相似问题
猜你感兴趣
版权申明
本文"Flink SQL 常用作业sql":http://eshow365.cn/6-32894-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!