用 flink 插件chunjun实现全量+增量同步-达梦数据库到postgresql
最佳答案 问答题库628位专家为你答疑解惑
用 flink 插件chunjun实现全量+增量同步,这里以达梦数据库同步到postgresql数据库为例。
纯钧下载地址:纯钧
纯钧是一款稳定、易用、高效、批流一体的数据集成框架,目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。
达梦表ddl:
CREATE TABLE SYSDBA.SOURCE_TABLE (ID INT NOT NULL,NAME VARCHAR(100),CREATE_TIME INT,CONSTRAINT PK_SOURCE_TABLE_ID PRIMARY KEY (ID)
);
CREATE UNIQUE INDEX INDEX33555468 ON SYSDBA.SOURCE_TABLE (ID);
postgresql ddl:
CREATE TABLE public.SINK_TABLE (id int4 NOT NULL,"name" varchar(100) NULL,create_time int4 NULL,CONSTRAINT pk_SINK_TABLE_id2 PRIMARY KEY (id)
);
纯钧的sql:
create table SOURCE_TABLE(ID INT, NAME varchar(200),CREATE_TIME INT)
with ('connector' = 'dm-x','url' = 'jdbc:dm://11.0.24.107:5236','schema' = 'SYSDBA','table-name' = 'SOURCE_TABLE','username' = 'SYSDBA','password' = 'SYSDBA001','scan.increment.column' = 'CREATE_TIME','scan.increment.column-type' = 'int','scan.polling-interval' = '3000','scan.fetch-size' = '200','scan.query-timeout' = '10'
);
CREATE TABLE SINK_TABLE (id INT,name varchar(200),create_time INT,PRIMARY KEY (id) NOT ENFORCED)with ('password'='sys','connector'='postgresql-x','sink.buffer-flush.interval'='1000','sink.all-replace'='true','sink.buffer-flush.max-rows'='100','table-name'='SINK_TABLE','sink.parallelism'='1','url'='jdbc:postgresql://11.0.101.10:39001/sys','username'='sys');
insert into SINK_TABLE select ID,NAME,CREATE_TIME from SOURCE_TABLE;
原理就是根据create_time这个字段的更新而增量更新修改、添加操作。
参数解释:
,'scan.increment.column' = 'create_time' -- 增量字段,根据这个字段判断是否更新
,'scan.increment.column-type' = 'int' -- 增量字段类型
,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务,执行一次就技术),无默认
'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
-- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
-- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
99%的人还看了
相似问题
- 【论文阅读】SPARK:针对视觉跟踪的空间感知在线增量攻击
- 软件过程模型分析与适应场景: 瀑布、原型、增量、螺旋、组件化和统一模型简介
- 后端设计PG liberty的作用和增量式生成
- winscp文件增量同步到linux服务器
- 开发模型(瀑布、螺旋、scrum) 和 测试模型(V、W)、增量和迭代、敏捷(思想)及敏捷开发 scrum
- 4 OpenCV实现多目三维重建(多张图片增量式生成稀疏点云)【附源码】
- 记一次fineBI的增量删除更新BUG
- flink以增量+全量的方式更新广播状态
- 机器学习之单层神经网络的训练:增量规则(Delta Rule)
- 【笔记】离线Ubuntu20.04+mysql 5.7.36 + xtrabackup定时增量备份脚本
猜你感兴趣
版权申明
本文"用 flink 插件chunjun实现全量+增量同步-达梦数据库到postgresql":http://eshow365.cn/6-9955-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!