背景介绍

随着数据收集的手段不断丰富,数据量也随之增大,而针对这些数据的分析需求也越来越旺盛,旧的RDBMS数据库(如oracle),可能无法承受OLAP的需求;而完全将这些数据迁移到bigdata的方案上,又很难满足ACID等需求。通常比较折衷的方案是,OLTP的应用仍然使用旧的oracle数据库,而OLAP的应用使用bigdata方案,例如hadoop、阿里云ADS等。有的人将这种方案形象的成为“读写分离”。

接下来,要解决数据如何从oracle同步到ADS。一般来说有两种:

  • 批量同步。可以使用开源的kettle,将数据从源端select出来后,再批量insert到目的端。kettle可以做到定时调度,通过定义抽取变量,可以做到每隔一段时间同步一次。目前国内的数据同步软件很多都是在kettle的基础上改的;如果目的端是一些比较特殊的数据库,可以通过为kettle编写插件来支持。
  • 实时同步。显然,批量同步的缺点是同步周期长,无法满足一些对时延要求比较高的情景,需要Oracle GoldenGate这样的软件来做到实时或者准实时的同步。

整体架构

OGG的整体结构略微复杂,官方图描述了一个比较典型的数据同步过程。

Oracle GoldenGate Logical Architecture

  • Manager:管理GoldenGate整体各进程的起动/停止、监视、管理,Trail文件的生成、删除
  • Trail文件:将DB变更信息以逻辑形式存储的、GoldenGate中间文件。配置时需要指定Trail文件的路径。
  • Extract:数据同步的起点。从REDO日志文件获取变更信息,以最小10ms间隔读取Redo日志,将变更信息输出到Trail文件。
  • DataPump:Trail文件可以直接放到目的端机器,也可以先放在本地,然后通过通过tcp连接传给目的端的Collector,发送时数据可以压缩。DataPump也是一种Extract,一般讲上面的Extract叫做primary Extract,将DataPump叫做secondly Extract。我们用的是第二种做法。
  • Collector:收集DataPump发过来的数据库变更信息,生成Trial文件。
  • Replicat:将Trail文件转为SQL语句在目的DB上执行,实现最终数据复制

其中,Manager和Extract、Collector、Replicate通过共享内存来通信;源、目的端的Trial文件复制通过TCP连接。

OGG装好后,源端和目的端都可以使用其目录下的ggsci来配置,不过这个东西不支持上下箭头(貌似是readline授权协议的原因),可以使用rlwrap在外面包一层,具体参考这篇博客

由于我们的目的端是阿里云的ADS,虽然其接入方式为mysql,但并不是一个完整实现,如果直接使用官方OGG的mysql连接来同步数据,甚至连接都建立不起来。所以,我们需要自己来实现一个OGG的插件。 阿里在github上开源了他们为datahub实现的插件aliyun-odps-ogg-plugin。我们暂时还不能开源,不过实现思路基本是一样的,两个插件的作者还曾经是室友。

ADS插件

插件编写方式参见官方说明11.3 Coding a Custom Handler in Java。需要注意一点,使用JAVA自定义插件时,我们会替换掉原来的Replict,而是改用Exract+Plugin(即在目的端再配置一个Extract,类似DataPump,并配置该Extract的同名.properties)。

数据流向:源端写入目的端的Trial文件,Extract从Trail文件中抽取记录并做回放,并将数据传递给JAVA自定义插件,插件再将框架给的数据转为SQL,再发送给ADS执行,完成数据更新。

下面这段代码是从官网上抄的,上面的官方链接中有详细的处理过程说明。

import oracle.goldengate.datasource.*;
import static oracle.goldengate.datasource.GGDataSource.Status;
public class SampleHandler extends AbstractHandler {
        @Override
        public void init(DsConfiguration conf, DsMetaData metaData) {
            super.init(conf, metaData);
            // ... do additional config...
        }
        @Override
        public Status operationAdded(DsEvent e, DsTransaction tx, DsOperation op) { ... }
        @Override
        public Status transactionCommit(DsEvent e, DsTransaction tx) { ... }
        @Override
        public Status metaDataChanged(DsEvent e, DsMetaData meta) { .... }
        @Override
        public void destroy() { /* ... do cleanup ... */ }
        @Override
        public String reportStatus() { return "status report..."; }
}

性能调优

实测发现,数据更新性能很差,我们基于kettle做的产品可以做到8000+的批量同步速度,但是OGG只有300左右,虽然OGG的处理相对复杂一些,但不应该差距如此之大。 优化思路有两个:

  1. 优化插件代码,提高单线程的写入速度
  2. 并发,人多好干事。

下面会详细说明第二个做法。

并发详解

如下图,OGG的DataPump和Replicat都支持配置成并发的模式。实际使用中,由于Replicate的工作相对比较复杂,所以通常会成为瓶颈,需要考虑拆分;而Extract性能比较高,一般不需要考虑优化(有人给的数据是,单个extract进程可处理日志一般为30-50G/小时,单个replicat进程一般只能处理1G队列/小时)。官方的配置参见这里

Oracle GoldenGate Configuration Elements for Data Distribution

Replicate并发 Replicate的并发粒度控制比较细,可以多个表分给不同的Replicate,如果表比较大,也可以将单个表按主键做hash,分配给不同的Replicate。我没有做过Replicate的并发,就不啰嗦了。

Extract并发 等下,刚刚不是说一般不用配置Extract并发吗?对,但是由于我们这里是自定义的JAVA插件,并没有Replicate,所以优化对象就落在JAVA插件上了;而JAVA插件的数据来源,就是目的端上的Extract,因此在目的端配置多个Extract,就可以得到多个并发复制线程啦。

那么,多个并发线程之间,如何保证多条数据之间不会出现时序冲突呢?比如对同一条数据先insert后delete,但多线程处理时delete先到了ADS,这就悲剧了,数据残留(Replicate在多表多Replicate时不会有问题,但对单表多Replicate时也会有同样的问题)。实际上这个问题也很好处理,只要保证同一条数据按主键能分配到同一个线程即可,我们并不关注多条数据的插入顺序,保证最终一致性即可。

下面我直接给出一个配置例子。表很简单,只有3列:c1 int, c2 varchar, c3 varchar,c1为主键。

  1. 目的端配置第1个Extract,取名为adstest1,其param如下:
EXTRACT adstest1
SourceDefs ./dirdef/togg.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GETUPDATEBEFORES
Table ogg_owner.togg , FILTER((c1 \ 3)=0);

解释一下,多条数据分流,靠的就是这里的FILTER。其中’'是取余计算(对,不是mod,也不是%,而且不要丢掉两边的空格)。 FILTER比较强大。这里一开始我用的是SQLPREDICATE,但貌似没什么效果,不太明白。

  1. 配置adstest1这个extract同名的properties文件adstest1.properties

指定改handler的type是JAVA类,SimpleOGG也是框架导入数据的入口。

gg.handlerlist=simpletest
gg.handler.simpletest.type=com.ieevee.dthink.ads.ogg.SimpleOGG
...
  1. 配置第二、三个Extract(adstest2、adstest3),与adstest1不同的是FILTER条件,分别是“=1”、“=2”
  2. start adstest1/2/3
  3. info all查看各进程状态

在这个例子中,我配置了3个并发Extract,源数据根据主键c1对3取模,分为三个线程分别写入到ADS。 如果你的表的主键有多个,或者表的主键不是数值类型,可以通过修改FILTER来做。我想DBA应该比较了解吧 :)

感谢实习生张靓云和包衍同学对这个插件做出的巨大贡献。