用 flink 插件chunjun实现全量+增量同步-达梦数据库到postgresql

news/2024/9/6 21:18:55 标签: flink, 数据库, 大数据

flink 插件chunjun实现全量+增量同步,这里以达梦数据库同步到postgresql数据库为例。

纯钧下载地址:纯钧

纯钧是一款稳定、易用、高效、批流一体的数据集成框架,目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。

达梦表ddl:

CREATE TABLE SYSDBA.SOURCE_TABLE (
	ID INT NOT NULL,
	NAME VARCHAR(100),
	CREATE_TIME INT,
	CONSTRAINT PK_SOURCE_TABLE_ID PRIMARY KEY (ID)
);
CREATE UNIQUE INDEX INDEX33555468 ON SYSDBA.SOURCE_TABLE (ID);

postgresql ddl:

CREATE TABLE public.SINK_TABLE (
	id int4 NOT NULL,
	"name" varchar(100) NULL,
	create_time int4 NULL,
	CONSTRAINT pk_SINK_TABLE_id2 PRIMARY KEY (id)
);

纯钧的sql:

create table SOURCE_TABLE(
    ID  INT, 
    NAME varchar(200),
    CREATE_TIME INT
    )
with (
      'connector' = 'dm-x',
      'url' = 'jdbc:dm://11.0.24.107:5236',
      'schema' = 'SYSDBA',
      'table-name' = 'SOURCE_TABLE',
      'username' = 'SYSDBA',
      'password' = 'SYSDBA001',
      'scan.increment.column' = 'CREATE_TIME',
      'scan.increment.column-type' = 'int',
      'scan.polling-interval' = '3000',
      'scan.fetch-size' = '200',
      'scan.query-timeout' = '10'
);
CREATE TABLE SINK_TABLE (
    id INT,
    name varchar(200),
    create_time INT,
    PRIMARY KEY (id) NOT ENFORCED)
    with (
    'password'='sys',
    'connector'='postgresql-x',
    'sink.buffer-flush.interval'='1000',
    'sink.all-replace'='true',
    'sink.buffer-flush.max-rows'='100',
    'table-name'='SINK_TABLE',
    'sink.parallelism'='1',
    'url'='jdbc:postgresql://11.0.101.10:39001/sys',
    'username'='sys'
    );
insert into SINK_TABLE select ID,NAME,CREATE_TIME from SOURCE_TABLE;

原理就是根据create_time这个字段的更新而增量更新修改、添加操作。

参数解释:

,'scan.increment.column' = 'create_time' -- 增量字段,根据这个字段判断是否更新

,'scan.increment.column-type' = 'int'  -- 增量字段类型

,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务,执行一次就技术),无默认

      'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句

                                  -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。

                                  -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。


http://www.niftyadmin.cn/n/5036011.html

相关文章

uni-app微信小程序canvas中使用canvasToTempFilePath在手机上导出图片尺寸与实际不符

问题描述:比如图片的尺寸是1125*2001像素,这样用微信开发者工具下载下来的图片尺寸是1125*2001像素,用不同的手机去操作,下载出来的图片尺寸都不一样,和原图片尺寸差距很大。 解决方案:canvas写入的时候是…

RK3588平台开发系列讲解(项目篇)视频监控之RTMP推流

文章目录 一、RTMP协议是什么二、RTMP 的原理三、Nginx 流媒体服务器四、FFmpeg 推流沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 目前常见的视频监控和视频直播都是使用了 RTMP、RTSP、HLS、MPEG-DASH、WebRTC流媒体传输协议等。 视频监控项目组成,分为三部分:…

揭秘Spring Boot内嵌Tomcat原理

tomcat 介绍 tomcat 是 web容器(servlet 容器),不管请求是访问静态资源HTML、JSP还是java接口,对tomcat而言,都是通过servlet访问: 访问静态资源,tomcat 会交由一个叫做DefaultServlet的类来处…

java框架-Spring-AOP

AOP:动态代理 开发步骤: 导入aop模块定义业务逻辑类定义切面类; -. 切面类标注:Aspect -. 切面类注解: Before: 前置通知, 在方法执行之前执行 After: 后置通知, 在方法执行之后执行 。 AfterRunning: 返回通知, 在方法返回结果之…

04-Docker应用部署

MySQL部署 需求 在Docker容器中部署MySQL,并通过外部MySQL客户端操作MySQL Server 实现步骤 搜索MySQL镜像 docker search mysql拉取MySQL镜像 docker pull mysql:5.6创建MySQL容器、设置端口映射、目录映射 # 在/root目录下创建mysql目录用于存储mysql数据信息 mk…

【前端打怪升级日志之微前端框架篇】微前端qiankun框架子应用间跳转方法

参考链接qiankun官网&#xff1a;微应用之间如何跳转&#xff1f; 1.主应用、子应用路由都是hash模式 主应用根据 hash 来判断微应用&#xff0c;无需考虑该问题 2.主应用根据path判断子应用 方法实现适用条件参数传递存在问题a标签跳转<a href"/toA"></…

OSCP系列靶场-Intermediate-BTRSys2.1保姆级

OSCP系列靶场-Intermediate-BTRSys2.1 目录 OSCP系列靶场-Intermediate-BTRSys2.1总结准备工作信息收集-端口扫描目标开放端口收集目标端口对应服务探测 信息收集-端口测试21-FTP端口的信息收集21-FTP版本版本信息与MSF利用21-FTP端口匿名登录测试(成功)21-FTP端口-文件GET收集…

Zipping

Zipping 信息收集端口扫描目录扫描webbanner信息收集 漏洞利用空字节绕过---->失败sqlI-preg_match bypass反弹shell 稳定维持 提权-共享库漏洞 参考&#xff1a;https://rouvin.gitbook.io/ibreakstuff/writeups/htb-season-2/zipping#sudo-privileges-greater-than-stock-…