【flink-sql实战】flink 主键声明与upsert功能实战

news/2024/6/16 2:16:58 标签: sql, flink, 数据库

文章目录

  • 一. flink 主键声明语法
  • 二. 物理表创建联合主键表
  • 三. flink sql使用

flink__3">一. flink 主键声明语法

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

 
有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。
 
Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

注意: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

 
sql声明语法:

sql">CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
 ...

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
...

联合主键声明

sql"> create table t_sink_01 ( 
f1 varchar, 
f2 varchar,
f3 int,
f4 timestamp(3),
f5 varchar, 
primary key(f1,f2)  NOT ENFORCED  -- 主键声明,字段之间逗号分隔
)
with( 
..
) ;

 
 

二. 物理表创建联合主键表

sql">CREATE TABLE test003(
     id INT(10),
     name VARCHAR(25),
     age int(10),
     PRIMARY KEY(id,name));


desc test003

Field|Type       |Null|Key|Default|Extra|
-----+-----------+----+---+-------+-----+
id   |int        |NO  |PRI|       |     |
name |varchar(25)|NO  |PRI|       |     |
age  |int        |YES |   |       |     |

 

flink_sql_78">三. flink sql使用

sql">CREATE TABLE source
(   `id` int,
 	`username` varchar,
 	`age` int
) WITH (
  'connector' = 'binlog-x'
      ,'username' = 'root'
      ,'password' = '11111111'
      ,'cat' = 'insert,delete,update'
      ,'url' = 'jdbc:mysql://10.17.31.234:3306/360test'
      ,'host' = '10.17.31.234'
      ,'port' = '3306'
      -- 什么都不加:最新位置消费
      -- 加文件名,从此文件开头消费
       ,'journal-name' = 'binlog.000194'
      --  ,'timestamp'='169944781200'
      ,'table' = '360test.dimension_table'
      ,'timestamp-format.standard' = 'SQL'
      );
CREATE TABLE sink
(   `id` int,
 	`name` varchar,
 	`age` int,
 	PRIMARY KEY (id,name) NOT ENFORCED
) WITH (
        'connector' = 'mysql-x',
           'url' = 'jdbc:mysql://localhost:3306/360test',
           'table-name' = 'test003',
           'username' = 'root',
           'password' = '11111111',
           'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
           'sink.buffer-flush.interval' = '10000', -- 批量写时间间隔,默认:10000毫秒
           -- insert时的选项,覆盖或者忽略。
           -- 声明了主键时,设置all-replace为true,全部更新覆盖,
           -- 或者是忽略,即来的新数据不插入?
           'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
                                       -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
                                       -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
           -- 新增写入选项:默认会判断,当声明了key则是update
           'sink.parallelism' = '1'    -- 写入结果的并行度,默认:null
      );
insert into sink select id,username as name,age as age  from source;



http://www.niftyadmin.cn/n/5170637.html

相关文章

AR工业眼镜:智能化生产新时代的引领者!!

科技飞速发展&#xff0c;人工智能与增强现实&#xff08;AR&#xff09;技术结合正在改变生活工作方式。AR工业眼镜在生产领域应用广泛&#xff0c;具有实时信息展示、智能导航定位、远程协作培训、智能安全监测等功能&#xff0c;提高生产效率、降低操作风险&#xff0c;为企…

AI批量剪辑矩阵托管系统----源码技术开发

AI批量剪辑矩阵托管系统----源码技术开发 抖音账号矩阵系统是基于抖音开放平台研发的用于管理和运营多个抖音账号的平台。它可以帮助用户管理账号、发布内容、营销推广、分析数据等多项任务&#xff0c;从而提高账号的曝光度和影响力。 具体来说&#xff0c;抖音账号矩阵系统可…

linux espeak语音tts;pyttsx3 ubuntu使用

整体使用espeak声音很机械不太自然 1、linux espeak语音tts 安装&#xff1a; sudo apt install espeak使用&#xff1a; #中文男声 espeak -v zh 你好 #中文女声 espeak -v zhf3 你好 #粤语男声 espeak -v zhy 你好注意&#xff1a;espeak -v zh 你好 &#xff08;Full d…

java开发宝典

Java命名规范 1&#xff1a;代码中的命名均不能以下划线或美元符号开始&#xff0c;也不能以下划线或美元符号结束。 反例&#xff1a;_name / __name / $name / name_ / name$ / name__ 。 2&#xff1a;禁止使用拼音和英文混合。 反例&#xff1a;DaZhePromotion [打折] / …

30基于Feign远程调用

2.1.Feign替代RestTemplate Fegin的使用步骤如下&#xff1a; 1&#xff09;引入依赖 我们在order-service服务的pom文件中引入feign的依赖&#xff1a; <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-start…

C++模板特化详解

一、为什么需要模板特化 这周四&#xff0c;帮一个长相英俊的同事排查问题&#xff0c;他写了一段类似于这样的逻辑&#xff1a; class student{int age 25; public:void mycout(){cout<<age<<endl;} };template<typename T> class Base {T t; public:Bas…

JTS: 17 DiscreteHausdorffDistance 豪斯多夫距离计算

这里写目录标题 版本代码 版本 org.locationtech.jts:jts-core:1.19.0 链接: github 代码 import org.locationtech.jts.algorithm.distance.DiscreteHausdorffDistance; import org.locationtech.jts.algorithm.distance.PointPairDistance; import org.locationtech.jts.ge…

2022ICPC济南站

K Stack Sort 题意&#xff1a;给你一个长度为n的排列&#xff0c;设有m个栈&#xff0c;你需要将这n个数按出现顺序入栈&#xff0c;每次入栈操作从m个栈中选择一个栈从栈顶入栈。当所有元素入栈完成后&#xff0c;需要不断选择栈&#xff0c;将栈中元素弹空。需满足出栈顺序…