Alibaba DataX调研使用
接触DataX是基于公司离线数据同步需求,从而开始接触到DataX的使用。前异构数据之间开源同步工具,主要有Sqoop Sqoop是一款开源的工具,主要用于Hadoop与传统RDBMS之间的数据同步,可以将RDBMS中的数据同步到HDFS中,也可以进行逆向操作。主要是基于MR任务的进行同步,具有支持并发、增量更新、支持海量数据同步等优点。 Sqoop Wiki Sqoo...
接触DataX是基于公司离线数据同步需求,从而开始接触到DataX的使用。前异构数据之间开源同步工具,主要有
Sqoop
Sqoop是一款开源的工具,主要用于Hadoop与传统RDBMS之间的数据同步,可以将RDBMS中的数据同步到HDFS中,也可以进行逆向操作。主要是基于MR任务的进行同步,具有支持并发、增量更新、支持海量数据同步等优点。
Sqoop Wiki
Sqoop官网OGG
Oracle Golden Gate,缩写为OGG,可见其是Oralce“冠名”的同步工具。主要是RDBMS之间的数据同步,常见的关系型数据,例如Oracle/DB2/Sybase/MySQL/MsSQL等均支持。
OGG介绍Kettle
Kettle是一款开源的ETL工具,具有集群模式。Kettle主要用来做ETL数据处理,具有方便快捷、可视化等优点,适合于一般中小业务的使用。- DataX
DataX是由Alibaba开源的一款异构数据同步工具,可以在常见的各种数据源之间进行同步,并仅依赖Java环境,具有轻量、插件式、方便等优点,可以快速完成同步任务。一般公司的数据同步任务,基本可以满足。 - canal+otter
canal+otter数据同步方案,同样是Alibaba开源的同步工具。该方案适用于大规模、扩机房、跨区域等重量级数据同步任务,并具有监控、近实时同步等优点。 - SymmetricDS
同样是一款基于Java开发的分布式开源同步软件,基于复制原理,源于Java的特性,SymmetricDS具有跨平台、多线程、可监控等优点。
博主公司具有实时同步数据与离线同步数据的需求,因此需要调研数据同步工具。其中实时同步主要是从MySQL同步数据到Hive,这里我们选择的是MySQL Binlog方式进行实时同步;离线同步,主要是从各种数据源–如MySQL、ODPS、HIVE等–之间的数据同步。
经过调研发现,DataX具有以下优点
1. 社区活跃,虽然Alibaba没有开源集群模式,仍有许多用户群体;
2. Alibaba出品,有大公司维护;
3. 使用方便,只需要配置JSON即可使用;无需安装,直接解压即可使用;
4. 完善的文档,很快可以上手,学习成本近乎于零。
DataX介绍
DataX是Alibaba推出的异构数据通过方案,通过插件式组合,可以完成不同数据源之间的数据同步任务。DataX可以通过的数据包括目前所见的绝大数,包括关系型数据库[MySQL、Oracle、SQLServer、PostgreSQL等]、阿里云数据仓数据存储[ODPS、ADS、OSS、OCS等]、NoSQL数据库[OTS、Hbase、MongoDB、Hive等]、无结构化数据[TextFile、FTP、HDFS、ES等]。另外DataX提供了方便的扩展实现,可以方便的个性化定制自己的数据源,方便扩展。
目前开源出来的DataX已经发布了3.0版本,增加了许多新的特性,包括
1. 可靠地数据质量监控
2. 丰富的数据转换功能
3. 精准的速度控制
4. 强劲的同步性能
5. 健壮的容错机制
6. 极简的使用体验
更加详细的介绍,可以参考DataX的官方说明:
User Guide
DataX-Introduction
插件开发宝典
目前使用场景
目前我们使用到的场景如下所示
同步的路径为:
1. MySQL->Hive
2. Hive->MySQL
3. ODPS->Hive
4. ODPS->MySQL
5. RDS->Hive
选择DataX还有一个重要的特性就是,DataX是Alibaba提供的数据同步方案,天然的支持阿里云的数据源,不需要我们重新从API开始开发,节省开发成本。
使用体会
在使用DataX的过程中,总体而言遇到的问题较少。目前DataX的主要缺点在于开源出来的DataX,缺少分布式支持,是单机版本,无法充分发挥集群的里面。因此,会存在单机节点存在的各种问题,内存、CPU、网络等问题。
使用一段时间,有一些思考
1. DataX集群模式的实现方式,如何实现?如果集群之后,如何监控?
2. DataX单节点状态时,任务运行性能参数的收集。DataX提供了Hook,可以回调打印一些参数,另外可以加入最后打印的数据,例如
Map<String, Number> preLogStatics = communication.getCounter();
preLogStatics.put("任务启动时刻", startTimeStamp);
preLogStatics.put("任务结束时刻", endTimeStamp);
preLogStatics.put("任务总计耗时", totalCosts);
preLogStatics.put("任务平均流量", byteSpeedPerSecond);
preLogStatics.put("记录写入速度", recordSpeedPerSecond);
preLogStatics.put("读出记录总数", CommunicationTool.getTotalReadRecords(communication));
preLogStatics.put("读写失败总数", CommunicationTool.getTotalErrorRecords(communication));
HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, preLogStatics);
这样我们收集到更多的运行参数,方便后期对任务的运行作分析,便于改进优化任务的运行参数。
3. DataX.py本质上运行的是Java程序,如果直接调用Engine.entry,如何统计任务运行日志?是否存在一种方式,可以按照线程打印单独一个文件的日志?
参考文章
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)