Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Data_pump
  • data_pump

Last edited by 李林坳 Sep 23, 2022
Page history

data_pump

data_pump使用说明

data_pump是一个数据清洗引擎,通过简单的yml配置文件,动态驱动多种格式数据的输入、转换、输出。从而简化数据清洗流程、提高生产效率

清洗流程中数据以字典(dict对象)或列表的形式流转,列表中的元素为字典对象

用法

Usage: data_pump.py pump-data [OPTIONS]

Options:
  -c, --config FILENAME        yml配置文件的路径,可以是相对路径,必要参数
  --default_config TEXT        指定的配置环境, 非必要参数
  -v, --assign_attrs <TEXT TEXT>...   指定配置文件中的变量值,可以有多组, 非必要参数
  -r, --reader TEXT
  --help                          Show this message and exit.
(collie) 

执行命令:

python data_pump.py pump-data --config 配置文件路径 --default_config 配置环境 -v 变量名1 变量值 -v 变量名2 变量值 

核心组件:

核心组件 插件 说明
输入 readers file 文件输入
sql 数据库输入
kafka kafka输入
mongodb MongoDB输入
MongoChangeReader MongoDB oplog输入
输出 writers stdout 输出到终端标准
file 输出到本地文件
kafka 输出到kafka
es 输出到es
udm 输出到Collie的UDM模块
过滤 filters field 对数据增、删、改
udm 使用Collie的UDM模块作为过滤器
date 对日期字段进行格式化
regex 过滤某个字段与re表达式匹配的数据
http 通过http更新数据内容
match 过滤与模式匹配的数据
bloom 布隆过滤器去重

配置文件说明

变量

内置时间变量

变量名 说明
today 今天的日期,默认的格式为:%Y%m%d
yesterday 昨天的日期,默认的格式为:%Y%m%d
last_month 上个月一号的日期, 默认的格式为:%Y-%m-%d
N_days_ago N天之前的日期,N可以取值[1, 30]
N_hours_ago N小时之前的时间, N可以取值[1, 23]

所有时间变量都可以自定义日期格式。方法如下:

{变量名:日期格式} 

日期格式为python日期格式字串

如今天的日期是 2021年2月4日。希望输出的值为 2021.02.04,则只需将变量写为 {today:%Y.%m.%d}

注意 自定义格式为20201222以后的版本支持的功能。如需使用请更新collie版本,并重新安装collie_common包。

配置样例:

# job_id作为配置文件的唯一标识,用32位字符串标识,可随机生成一个uuid
job_id: xxxx

# boss 项目负责人钉钉使用的手机号,可以定义多个,用英文逗号分割 
boss: 15001727350

# 自定义变量,可以定义多个,引用处使用方法 "{v1}"
vars:
    v1: 'str1'
    v2: 192168

# 格式化处理器
formaters:
    formater1: bson.Bson
    formater2: json_formater.Json
    formater3: tsv.Tsv
    formaterN: xx.xxx

# 输入定义,支持多种数据源
readers:
  reader1:                       # 定义的reader名称,SqlDocReader类的初始化参数name的值
    class: sql.SqlDocReader      # 使用的readers的类,即readers目录下sql.py文件中的类SqlDocReader
    init:                        # SqlDocReader类的初始化参数,其中name不需要指定,name即上面的reader1
      formater: formater1        # formater1 对应格式化处理器中定义的formater1,即用bson格式读取数据

# 输出,支持写入多种存储容器
writers:
    writer1: 
      class: stdout.StdoutWriter
      init:
        formater: formater2

# 过滤器
filters:
    filter1: 
      class: field.Add

# 配置数据清洗流程
# 默认的清洗流程,即不指定--default_config时的处理流程
pump:
    reader:
      - reader1
    writer:
      - writer1
      - wirter2
    filter:
      - filter1

# 自定义配置环境,动态构建清洗组件
# 自定义的配置环境中的设置会覆盖默认设置
__dev:               # 自定义配置环境的名称为dev,需要在该环境下执行,则设置 --default_config dev
  pump:              # 以下配置会将默认流程的reader 修改为[reader3],writer修改为[writer3], filter保持不变,仍为[filter1]
    reader:
      - reader3
    writer:
      - writer3
  vars:
    v1: 'sss'      # 将变更v1的值从 'str1' 改为'sss'

__product:         # 需要在该环境下执行,则设置 --default_config product
    vars:
      v2: 'ttt'
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages