Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Data_stream
    • Equity_penetration
  • update_nebula

update_nebula · Changes

Page history
update: nebula增量例行 authored Jan 07, 2022 by 李子健's avatar 李子健
Hide whitespace changes
Inline Side-by-side
Showing with 89 additions and 26 deletions
+89 -26
  • data_stream/equity_penetration/update_nebula.md data_stream/equity_penetration/update_nebula.md +89 -26
  • No files found.
data_stream/equity_penetration/update_nebula.md
View page @ 94f880b2
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
## tag_firm ## tag_firm
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的tag_firm database nebula的tag_firm
...@@ -26,6 +27,7 @@ kafka --> 中间表nebula_tag_firm: 例行入表 ...@@ -26,6 +27,7 @@ kafka --> 中间表nebula_tag_firm: 例行入表
融合库 --> 中间表nebula_tag_firm: 存量入表后例行 融合库 --> 中间表nebula_tag_firm: 存量入表后例行
@enduml @enduml
``` ```
``` ```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula 从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
``` ```
...@@ -108,7 +110,7 @@ CREATE TABLE `nebula_tag_firm` ( ...@@ -108,7 +110,7 @@ CREATE TABLE `nebula_tag_firm` (
#### tb_company #### tb_company
``` ```
通过binlog更新 先存量入表,后续通过binlog更新
``` ```
| tb_company | nebula_tag_company | 备注 | | tb_company | nebula_tag_company | 备注 |
...@@ -153,6 +155,7 @@ CREATE TABLE `nebula_tag_firm` ( ...@@ -153,6 +155,7 @@ CREATE TABLE `nebula_tag_firm` (
## tag_person ## tag_person
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的tag_person database nebula的tag_person
...@@ -176,6 +179,7 @@ tb_person --> 中间表nebula_tag_person: 存量入表后例行 ...@@ -176,6 +179,7 @@ tb_person --> 中间表nebula_tag_person: 存量入表后例行
中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行 中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行
@enduml @enduml
``` ```
``` ```
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。 从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
``` ```
...@@ -223,7 +227,7 @@ CREATE TABLE `nebula_tag_person` ( ...@@ -223,7 +227,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_person #### tb_person
``` ```
通过binlog更新 先存量入表,后续通过binlog更新
``` ```
| tb_person | nebula_tag_person | 备注 | | tb_person | nebula_tag_person | 备注 |
...@@ -236,7 +240,7 @@ CREATE TABLE `nebula_tag_person` ( ...@@ -236,7 +240,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_employee #### tb_company_employee
``` ```
通过binlog更新is_history=0 先存量入表,后续通过binlog更新is_history=0
``` ```
| tb_company_employee | nebula_tag_person | 备注 | | tb_company_employee | nebula_tag_person | 备注 |
...@@ -247,7 +251,7 @@ CREATE TABLE `nebula_tag_person` ( ...@@ -247,7 +251,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_partner #### tb_company_partner
``` ```
通过binlog更新is_history=0 先存量入表,后续通过binlog更新is_history=0
``` ```
| tb_company_partner | nebula_tag_person | 备注 | | tb_company_partner | nebula_tag_person | 备注 |
...@@ -258,7 +262,7 @@ CREATE TABLE `nebula_tag_person` ( ...@@ -258,7 +262,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_legalperson #### tb_company_legalperson
``` ```
通过binlog更新is_history=0 先存量入表,后续通过binlog更新is_history=0
``` ```
| tb_company_legalperson | nebula_tag_person | 备注 | | tb_company_legalperson | nebula_tag_person | 备注 |
...@@ -269,6 +273,7 @@ CREATE TABLE `nebula_tag_person` ( ...@@ -269,6 +273,7 @@ CREATE TABLE `nebula_tag_person` (
## edge_serve ## edge_serve
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_serve database nebula的edge_serve
...@@ -290,6 +295,7 @@ kafka --> 中间表nebula_edge_serve: 例行入表 ...@@ -290,6 +295,7 @@ kafka --> 中间表nebula_edge_serve: 例行入表
融合库 --> 中间表nebula_edge_serve: 存量入表后例行 融合库 --> 中间表nebula_edge_serve: 存量入表后例行
@enduml @enduml
``` ```
``` ```
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula 从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
``` ```
...@@ -332,7 +338,7 @@ CREATE TABLE `nebula_edge_serve` ( ...@@ -332,7 +338,7 @@ CREATE TABLE `nebula_edge_serve` (
#### tb_company_enployee #### tb_company_enployee
``` ```
通过binlog更新 先存量入表,后续通过binlog更新
``` ```
| tb_company_enployee | nebula_edge_serve | 备注 | | tb_company_enployee | nebula_edge_serve | 备注 |
...@@ -359,6 +365,7 @@ CREATE TABLE `nebula_edge_serve` ( ...@@ -359,6 +365,7 @@ CREATE TABLE `nebula_edge_serve` (
## edge_invest_h ## edge_invest_h
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_invest_h database nebula的edge_invest_h
...@@ -380,6 +387,7 @@ kafka --> 中间表nebula_edge_invest_h: 例行入表 ...@@ -380,6 +387,7 @@ kafka --> 中间表nebula_edge_invest_h: 例行入表
融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行 融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行
@enduml @enduml
``` ```
``` ```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula 从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
``` ```
...@@ -425,7 +433,7 @@ CREATE TABLE `nebula_edge_invest_h` ( ...@@ -425,7 +433,7 @@ CREATE TABLE `nebula_edge_invest_h` (
#### tb_company_partner #### tb_company_partner
``` ```
通过binlog更新 先存量入表,后续通过binlog更新
``` ```
| tb_company_partner | nebula_edge_invest_h | 备注 | | tb_company_partner | nebula_edge_invest_h | 备注 |
...@@ -454,6 +462,7 @@ CREATE TABLE `nebula_edge_invest_h` ( ...@@ -454,6 +462,7 @@ CREATE TABLE `nebula_edge_invest_h` (
## edge_invest_c ## edge_invest_c
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_invest_c database nebula的edge_invest_c
...@@ -475,6 +484,7 @@ kafka --> 中间表nebula_edge_invest_c: 例行入表 ...@@ -475,6 +484,7 @@ kafka --> 中间表nebula_edge_invest_c: 例行入表
融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行 融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行
@enduml @enduml
``` ```
``` ```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula 从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
``` ```
...@@ -519,7 +529,7 @@ CREATE TABLE `nebula_edge_invest_c` ( ...@@ -519,7 +529,7 @@ CREATE TABLE `nebula_edge_invest_c` (
#### tb_company_partner #### tb_company_partner
``` ```
通过binlog更新 先存量入表,后续通过binlog更新
``` ```
| tb_company_partner | nebula_edge_invest_h | 备注 | | tb_company_partner | nebula_edge_invest_h | 备注 |
...@@ -547,24 +557,28 @@ CREATE TABLE `nebula_edge_invest_c` ( ...@@ -547,24 +557,28 @@ CREATE TABLE `nebula_edge_invest_c` (
## edge_own ## edge_own
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_own database nebula的edge_own
file 爬虫数据 file 爬虫数据
database 融合库 database 融合库
database 中间表
爬虫数据 --> 融合库 爬虫数据 --> 融合库
融合库 --> nebula的edge_own: binlog更新 融合库 --> 中间表: binlog更新
中间表 --> nebula的edge_own: binlog更新
@enduml @enduml
``` ```
``` ```
直接通过监控融合库tb_company_legalperson表的binlog更新 先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
``` ```
### nebula结构 ### nebula结构
```sql ```sql
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own 自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<pid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` ( CREATE EDGE `edge_own` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
...@@ -572,6 +586,24 @@ CREATE EDGE `edge_own` ( ...@@ -572,6 +586,24 @@ CREATE EDGE `edge_own` (
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge"; ) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
``` ```
### 中间表表结构
```sql
CREATE TABLE `nebula_edge_own` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` varchar(50) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_id` (`company_name_digest`, `ppid`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_own的数据中间表';
```
### 涉及到的数据源 ### 涉及到的数据源
#### tb_company_legalperson #### tb_company_legalperson
...@@ -580,33 +612,39 @@ CREATE EDGE `edge_own` ( ...@@ -580,33 +612,39 @@ CREATE EDGE `edge_own` (
通过binlog更新,lp_ppid is not null and company_name_digest is not null 通过binlog更新,lp_ppid is not null and company_name_digest is not null
``` ```
| tb_company_legalperson | 备注 | | tb_company_legalperson | nebula_edge_own | 备注 |
| ---------------------- | ---- | | ---------------------- | ------------------- | --------------------------- |
| lp_ppid | | | lp_ppid | ppid | |
| company_name_digest | | | company_name_digest | company_name_digest | |
| is_history | | | is_history | is_history | |
| update_time | | | lp_ppid | pid | pid='p'+lp_ppid |
| company_name_digest | fid | fid='f'+company_name_digest |
## edge_own_c ## edge_own_c
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_own_c database nebula的edge_own_c
file 爬虫数据 file 爬虫数据
database 融合库 database 融合库
database 中间表
爬虫数据 --> 融合库 爬虫数据 --> 融合库
融合库 --> nebula的edge_own_c: binlog更新 融合库 --> 中间表: binlog更新
中间表 --> nebula的edge_own_c: binlog更新
@enduml @enduml
``` ```
``` ```
直接通过监控融合库tb_company_legalperson表的binlog更新 先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
``` ```
### nebula结构 ### nebula结构
```sql ```sql
企业作为企业法定代表人,与企业间关系的集合,类型为edge,用<s_fid,edge_own_c,(Rank),e_fid>唯一标识,表名为edge_own_c
CREATE EDGE `edge_own_c` ( CREATE EDGE `edge_own_c` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id", `lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
...@@ -614,6 +652,24 @@ CREATE EDGE `edge_own_c` ( ...@@ -614,6 +652,24 @@ CREATE EDGE `edge_own_c` (
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge"; ) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
``` ```
### 中间表表结构
```sql
CREATE TABLE `nebula_edge_own_c` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和lp_company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
`lp_company_name_digest` char(32) DEFAULT NULL COMMENT '法人企业唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_ow_c的数据中间表';
```
### 涉及到的数据源 ### 涉及到的数据源
#### tb_company_legalperson #### tb_company_legalperson
...@@ -622,16 +678,18 @@ CREATE EDGE `edge_own_c` ( ...@@ -622,16 +678,18 @@ CREATE EDGE `edge_own_c` (
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null 通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
``` ```
| tb_company_legalperson | 备注 | | tb_company_legalperson | nebula_edge_own_c | 备注 |
| ---------------------- | ---- | | ---------------------- | ---------------------- | -------------------------------- |
| lp_company_name_digest | | | lp_company_name_digest | lp_company_name_digest | |
| company_name_digest | | | company_name_digest | company_name_digest | |
| is_history | | | is_history | is_history | |
| update_time | | | lp_company_name_digest | s_fid | s_fid='f'+lp_company_name_digest |
| company_name_digest | e_fid | e_fid='f'+company_name_digest |
## edge_branch ## edge_branch
### 逻辑 ### 逻辑
```plantuml ```plantuml
@startuml @startuml
database nebula的edge_branch database nebula的edge_branch
...@@ -649,8 +707,9 @@ utn_ic.company_branch --> kafka: 增量写kafka ...@@ -649,8 +707,9 @@ utn_ic.company_branch --> kafka: 增量写kafka
kafka --> 中间表nebula_edge_branch: 例行入表 kafka --> 中间表nebula_edge_branch: 例行入表
@enduml @enduml
``` ```
``` ```
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表 从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表,监控中间表的binlog更新nebula
``` ```
### nebula结构 ### nebula结构
...@@ -686,6 +745,10 @@ CREATE TABLE `nebula_edge_branch` ( ...@@ -686,6 +745,10 @@ CREATE TABLE `nebula_edge_branch` (
#### utn_ic.company_branch #### utn_ic.company_branch
```
存量入表后,通过kafka增量入表
```
| company_branch | nebula_edge_branch | 备注 | | company_branch | nebula_edge_branch | 备注 |
| ------------------- | -------------------------- | ------------------------------------------------------------ | | ------------------- | -------------------------- | ------------------------------------------------------------ |
| branch_name_digest | branch_company_name_digest | | | branch_name_digest | branch_company_name_digest | |
......
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages