当前位置:首页 > 编程笔记 > 正文
已解决

Flink实现kafka到kafka、kafka到doris的精准一次消费

来自网友在路上 173873提问 提问时间:2023-10-11 08:48:04阅读次数: 73

最佳答案 问答题库738位专家为你答疑解惑

1 流程图

2 Flink来源表建模

--来源-城市topic
CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
record string 
) WITH ('connector' = 'kafka','topic' = 'QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:9092','properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE','scan.startup.mode' = 'group-offsets','properties.isolation.level' = 'read_committed','properties.auto.offset.reset' = 'earliest','format' = 'raw'
);
--来源-中台kafka-topic
CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
sscsdm string,
extract_time TIMESTAMP,
record string
) WITH ('connector' = 'kafka','topic' = 'ODS_QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','properties.kerberos.domain.name' = 'hadoop.hadoop.com','properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE','scan.startup.mode' = 'group-offsets','properties.auto.offset.reset' = 'earliest','properties.isolation.level' = 'read_committed','sink.semantic' = 'exactly-once','format' = 'json'
);

3 Flink去向表建模

--去向-中台kafka-topic
CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK  (
sscsdm string,
extract_time TIMESTAMP,
record string
) WITH ('connector' = 'kafka','topic' = 'ODS_QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','properties.kerberos.domain.name' = 'hadoop.hadoop.com','format' = 'json', 'properties.transaction.timeout.ms' = '900000'
);
--去向-Doris表
CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK (sscsdm STRING,extract_time TIMESTAMP,record STRING
) WITH ('connector' = 'doris','fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030','table.identifier' = 'doris_d.ods_ql_jc_ssjc','username' = 'root','password' = '********','sink.properties.two_phase_commit' = 'true' 
);

4 城市Topic至中台Topic的Flinksql

insert intoKAFKA_ODS_QL_JC_SSJC_SINKSELECT'320100' as sscsdm,CURRENT_TIMESTAMP as extract_time,recordFROMNJ_QL_JC_SSJC_SOURCEUNION ALL
SELECT'320200' as sscsdm,CURRENT_TIMESTAMP as extract_time,record
FROMWX_QL_JC_SSJC_SOURCE...UNION ALLSELECT'320583' as sscsdm,CURRENT_TIMESTAMP as extract_time,recordFROMKS_QL_JC_SSJC_SOURCE

5 中台Topic至Doris的Flinksql

insert into DORIS_ODS_QL_JC_SSJC_SINK
SELECTsscsdm,CURRENT_TIMESTAMP as extract_time,record
FROMODS_QL_JC_SSJC_SOURCE   

查看全文

99%的人还看了

相似问题

猜你感兴趣

版权申明

本文"Flink实现kafka到kafka、kafka到doris的精准一次消费":http://eshow365.cn/6-18851-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!