data_pump使用说明
data_pump是一个数据清洗引擎,通过简单的yml配置文件,动态驱动多种格式数据的输入、转换、输出。从而简化数据清洗流程、提高生产效率
清洗流程中数据以字典(dict对象)或列表的形式流转,列表中的元素为字典对象
用法
Usage: data_pump.py pump-data [OPTIONS]
Options:
-c, --config FILENAME yml配置文件的路径,可以是相对路径,必要参数
--default_config TEXT 指定的配置环境, 非必要参数
-v, --assign_attrs <TEXT TEXT>... 指定配置文件中的变量值,可以有多组, 非必要参数
-r, --reader TEXT
--help Show this message and exit.
(collie)
执行命令:
python data_pump.py pump-data --config 配置文件路径 --default_config 配置环境 -v 变量名1 变量值 -v 变量名2 变量值
核心组件:
核心组件 | 插件 | 说明 |
---|---|---|
输入 readers | file | 文件输入 |
sql | 数据库输入 | |
kafka | kafka输入 | |
mongodb | MongoDB输入 | |
MongoChangeReader | MongoDB oplog输入 | |
输出 writers | stdout | 输出到终端标准 |
file | 输出到本地文件 | |
kafka | 输出到kafka | |
es | 输出到es | |
udm | 输出到Collie的UDM模块 | |
过滤 filters | field | 对数据增、删、改 |
udm | 使用Collie的UDM模块作为过滤器 | |
date | 对日期字段进行格式化 | |
regex | 过滤某个字段与re表达式匹配的数据 | |
http | 通过http更新数据内容 | |
match | 过滤与模式匹配的数据 | |
bloom | 布隆过滤器去重 |
配置文件说明
变量
内置时间变量
变量名 | 说明 |
---|---|
today | 今天的日期,默认的格式为:%Y%m%d |
yesterday | 昨天的日期,默认的格式为:%Y%m%d |
last_month | 上个月一号的日期, 默认的格式为:%Y-%m-%d |
N_days_ago | N天之前的日期,N可以取值[1, 30] |
N_hours_ago | N小时之前的时间, N可以取值[1, 23] |
所有时间变量都可以自定义日期格式。方法如下:
{变量名:日期格式}
日期格式为python日期格式字串
如今天的日期是 2021年2月4日。希望输出的值为 2021.02.04,则只需将变量写为 {today:%Y.%m.%d}
注意 自定义格式为20201222以后的版本支持的功能。如需使用请更新collie版本,并重新安装collie_common包。
配置样例:
# job_id作为配置文件的唯一标识,用32位字符串标识,可随机生成一个uuid
job_id: xxxx
# boss 项目负责人钉钉使用的手机号,可以定义多个,用英文逗号分割
boss: 15001727350
# 自定义变量,可以定义多个,引用处使用方法 "{v1}"
vars:
v1: 'str1'
v2: 192168
# 格式化处理器
formaters:
formater1: bson.Bson
formater2: json_formater.Json
formater3: tsv.Tsv
formaterN: xx.xxx
# 输入定义,支持多种数据源
readers:
reader1: # 定义的reader名称,SqlDocReader类的初始化参数name的值
class: sql.SqlDocReader # 使用的readers的类,即readers目录下sql.py文件中的类SqlDocReader
init: # SqlDocReader类的初始化参数,其中name不需要指定,name即上面的reader1
formater: formater1 # formater1 对应格式化处理器中定义的formater1,即用bson格式读取数据
# 输出,支持写入多种存储容器
writers:
writer1:
class: stdout.StdoutWriter
init:
formater: formater2
# 过滤器
filters:
filter1:
class: field.Add
# 配置数据清洗流程
# 默认的清洗流程,即不指定--default_config时的处理流程
pump:
reader:
- reader1
writer:
- writer1
- wirter2
filter:
- filter1
# 自定义配置环境,动态构建清洗组件
# 自定义的配置环境中的设置会覆盖默认设置
__dev: # 自定义配置环境的名称为dev,需要在该环境下执行,则设置 --default_config dev
pump: # 以下配置会将默认流程的reader 修改为[reader3],writer修改为[writer3], filter保持不变,仍为[filter1]
reader:
- reader3
writer:
- writer3
vars:
v1: 'sss' # 将变更v1的值从 'str1' 改为'sss'
__product: # 需要在该环境下执行,则设置 --default_config product
vars:
v2: 'ttt'