1、模版获取

https://github.com/qkfm-97/nifi-example/blob/main/etl/oracle-to-greenplum.xml

2、任务说明

将oracle的test_car表数据先全量导入到greenplum中的public.gp_test_car中,然后半小时增量同步一次

具体的字段对应和处理如下

源表(test_car)目标表(public.gp_test_car)备注
unidid
carnamename
carsizeclzd
createtimerksj
cartypescllx
cllx_cn

cllx字典对应的

中文

fzsjfzsj

原表是varchar类型

时间格式为

yyyyMMddhhmmss

需要转换为目标表的timestamp格式

gp_rksj

入目标表的

入库时间,

默认是now()

3、流程截图

4、重点说明

  • 字典缓存转换
  1. 字典数据加载

    该组件的作用是将字典数据(例如车辆类型与其中文名称的映射)存储到缓存中,以便后续流程中使用。

    1. PutDistributedMapCache:将数据字典加载到分布式缓存中。
  2. 数据处理和字典转换

    该组件的作用是根据输入数据的键值(例如车辆类型代码),从缓存中获取相应的字典数据(例如中文名称)。

    1. FetchDistributedMapCache:从缓存中获取字典数据。
  3. 字典数据合并

    1. UpdateAttribute:使用${allMatchingAttributes("cllx_cn_.*"):join(",")}将多个字典合并成一个。

字典表数据如下所示

  • 合并多个flowfile成一个csv,可以通过MergeRecord组件

  

   temple_AvroSchemaRegistry的properties说明其中流程中的test_car_schema 可以按如下方式方便获取

      

合并后的csv样子

  • 多种入greenplum方式测试

       A、创建外表方式

       

--创建外表 这边使用gpfdist协议方式的外表
--如果使用hdfs方式就要把数据传到hdfs上
--外表协议 https://docs-cn.greenplum.org/v5/admin_guide/load/topics/g-working-with-file-based-ext-tables.html

drop external table if exists ext.gp_test_car;
 create external table ext.gp_test_car (like PUBLIC.gp_test_car) LOCATION ('gpfdist://ip:8080/gp_test_car.csv') FORMAT 'CSV' (HEADER DELIMITER '|')


--保证主键不重复先删除
	delete from public.gp_test_car t where exists (select 1 from ext.gp_test_car where id = t.id)


--在把外表数据插入,不插入gp_rksj字段
insert into public.gp_test_car(id,name,clzd,rksj,cllx,cllx_cn,fzsj) select id,name,clzd,rksj,cllx,cllx_cn,fzsj from ext.gp_test_car


       B、使用copy方式

        

--将csv文件上传到主节点的路径下,使用copy方式导入,速度快
copy public.gp_test_car(id,name,clzd,rksj,cllx,cllx_cn,fzsj) FROM '/data/nifi/gp_test_car.csv' WITH header csv delimiter '|'

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐