大数据时代,数据实时同步解决方案的思考—最全的数据同步总结

  • 时间:
  • 浏览:7
  • 来源:幸运快3_快3遗漏_幸运快3遗漏

1、 早期关系型数据库之间的数据同步

1)、全量同步

比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,这个 通过 jdbc的batch 方式 插入到目标表,这个 地方时需注意的是,分页查询时,一定要按照主键id来排序分页,处置重复插入。

2)、基于数据文件导出和导入的全量同步,这个 同步方式 一般只适用于同种数据库之间的同步,可能是不同的数据库,这个 方式 可能会趋于稳定难题。

3)、基于触发器的增量同步

增量同步一般是做实时的同步,早期就是 数据同步就有基于关系型数据库的触发器trigger来做的。

使用触发器实时同步数据的步骤:

A、 基于原表创触发器,触发器中有 insert,modify,delete 并就有类型的操作,数据库的触发器分Before和After并就有清况 ,并就有是在insert,modify,delete 并就有类型的操作趋于稳定事先触发(比如记录日志操作,一般是Before),并就有是在insert,modify,delete 并就有类型的操作事先触发。

B、 创建增量表,增量表中的字段和原表中的字段完整篇 一样,这个 时需多一五个 操作类型字段(分表代表insert,modify,delete 并就有类型的操作),这个 时需一五个 唯一自增ID,代表数据原表中数据操作的顺序,这个 自增id非常重要,不然数据同步就会错乱。

C、 原表中再次出现insert,modify,delete 并就有类型的操作时,通过触发器自动产生增量数据,插入增量表中。

D、处置增量表中的数据,处置时,一定是按照自增id的顺序来处置,这个 效率会非常低,没方式 做批量操作,不然数据会错乱。  村里人 可能会说,是就有还可不还能否 把insert操作合并在并肩,modify合并在并肩,delete操作合并在并肩,这个 批量处置,我给的答案是不行,可能数据的增完整篇 是有顺序的,合并后,就沒有顺序了,同三根数据的增完整篇 顺序一旦错了,那数据同步就肯定错了。

市面上就是 数据etl数据交换产品就有基于这个 思想来做的。

E、 这个 思想使用kettle 很容易就还可不还能否 实现,笔者一五个 在被委托人的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/73100673.html

4)、基于时间戳的增量同步

A、首先你们歌词 都歌词 时需一张临时temp表,用来存取每次读取的待同步的数据,也就是 把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

B、你们歌词 都歌词 还时需创建一五个 时间戳配置表,用于存放每次读取的处置完的数据的最后的时间戳。

C、每次从原表中读取数据时,先查询时间戳配置表,这个 就知道了查询原表时的始于了时间戳。

D、根据时间戳读取到原表的数据,插入到临时表中,这个 再将临时表中的数据插入到目标表中。

E、从缓存表中读取出数据的最大时间戳,这个 更新到时间戳配置表中。缓存表的作用就是 使用sql获取每次读取到的数据的最大的时间戳,当然有有哪些就有完整篇 基于sql一段话在kettle中来配置,才时需一五个 的一张临时表。

2、    大数据时代下的数据同步

1)、基于数据库日志(比如mysql的binlog)的同步

你们歌词 都歌词 都知道就是 数据库都支持了主从自动同步,尤其是mysql,还可不还能否 支持多主多从的模式。沒有你们歌词 都歌词 是就有还可不还能否 利用这个 思想呢,答案当然是肯定的,mysql的主从同步的过程是一五个 的。

  A、master将改变记录到二进制日志(binary log)中(有有哪些记录叫做二进制日志事件,binary log events,还可不还能否 通过show binlog events进行查看);

  B、slave将master的binary log events拷贝到它的中继日志(relay log);

  C、slave重做中继日志中的事件,将改变反映它被委托人的数据。

阿里巴巴开源的canal就完美的使用这个 方式 ,canal 伪装了一五个 Slave 去喝Master进行同步。

A、 canal模拟mysql slave的交互协议,伪装被委托人为mysql slave,向mysql master发送dump协议

B、 mysql master收到dump请求,始于了推送binary log给slave(也就是 canal)

C、 canal解析binary log对象(原始为byte流)

另外canal 在设计时,有点硬设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客户端: https://github.com/dotnetcore/CanalSharp

canal go客户端: https://github.com/CanalClient/canal-go

canal php客户端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

D、在使用canal时,mysql时需开启binlog,这个 binlog-format时需为row,还可不还能否 在mysql的my.cnf文件中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服务端,配置canal.properties文件,这个  启动 bin/startup.sh 或bin/startup.bat

#设置要监听的mysql服务器的地址和端口

canal.instance.master.address = 127.0.0.1:31006

#设置一五个 可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#连接的数据库

canal.instance.defaultDatabaseName =test

#订阅实例中所有的数据库和表

canal.instance.filter.regex = .*\\..*

#连接canal的端口

canal.port= 11111

#监听到的数据变更发送的队列

canal.destinations= example

F、 客户端开发,在maven中引入canal的依赖

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

代码示例:

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //连接canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //订阅 监控的 数据库.表
            connector.subscribe("demo_db.user_tab");
            //一次取10条
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("沒有消息,休眠5秒");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具体业务操作
                                        System.out.println("删除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("这个

操作类型不做处置");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //确认消息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

 

你们歌词 都歌词 有并就有方式 还可不还能否 实现,

A、 使用spark任务,通过HQl读取数据,这个 再通过hbase的Api插入到hbase中。

这个 这个 做法,效率很低,这个 大批量的数据并肩插入Hbase,对Hbase的性能影响很大。

在大数据量的清况 下,使用BulkLoad还可不还能否 快速导入,BulkLoad主就是 借用了hbase的存储设计思想,可能hbase本质是存储在hdfs上的一五个 文件夹,这个 底层是以一五个 个的Hfile趋于稳定的。HFile的形式趋于稳定。Hfile的路径格式一般是一五个 的:

/hbase/data/default(默认是这个 ,可能hbase的表沒有指定命名空间一段话,可能指定了,这个 就是 命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad实现的原理就是 按照HFile格式存储数据到HDFS上,生成Hfile还可不还能否 使用hadoop的MapReduce来实现。可能就有hive中的数据,比如实物的数据,沒有你们歌词 都歌词 还可不还能否 将实物的数据生成文件,这个 上传到hdfs中,组装RowKey,这个 将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

 

当然你们歌词 都歌词 也还可不还能否 不事先生成hfile,还可不还能否 使用spark任务直接从hive中读取数据转加带RDD,这个 使用HbaseContext的自动生成Hfile文件,偏离 关键代码如下:

…
//将DataFrame转换bulkload时需的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

C、pg_bulkload的使用

这是一五个 支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过实物文件加载的方式 ,这个 工具笔者沒有亲自去用过,完整篇 的介绍还可不还能否 参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基于sqoop的全量导入

Sqoop 是hadoop生态中的一五个 工具,专门用于实物数据导入进入到hdfs中,实物数据导出时,支持就是 常见的关系型数据库,也是在大数据中常用的一五个 数据导出导入的交换工具。

 

Sqoop从实物导入数据的流程图如下:

Sqoop将hdfs中的数据导出的流程如下:

本质就有用了大数据的数据分布式处置来快速的导入和导出数据。

4)、HBase中建表,这个 Hive中建一五个 实物表,一五个 当Hive中写入数据后,HBase中也会并肩更新,这个 时需注意

A、hbase中的空cell在hive中会补null

B、hive和hbase中不匹配的字段会补null

你们歌词 都歌词 还可不还能否 在hbase的shell 交互模式下,创建一张hbse表

create 'bokeyuan','zhangyongqing'

使用这个 命令,你们歌词 都歌词 还可不还能否 创建一张叫bokeyuan的表,这个 中间一五个 列族zhangyongqing,hbase创建表时,还可不还能否 不需要指定字段,这个 时需指定表名以及列族

你们歌词 都歌词 还可不还能否 使用的hbase的put命令插入这个 数据

put 'bokeyuan','001','zhangyongqing:name','robot'

put 'bokeyuan','001','zhangyongqing:age','20'

put 'bokeyuan','002','zhangyongqing:name','spring'

put 'bokeyuan','002','zhangyongqing:age','18'

还可不还能否 通过hbase的scan 全表扫描的方式 查看你们歌词 都歌词 插入的数据

scan ' bokeyuan'

你们歌词 都歌词 继续创建一张hive实物表

create external table bokeyuan (id int, name string, age int) 

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 

TBLPROPERTIES("hbase.table.name" = " bokeyuan");

实物表创建好了后,你们歌词 都歌词 还可不还能否 使用HQL一段话来查询hive中的数据了

select * from classes;

OK

1 robot 20

2 spring 18

Debezium是一五个 开源项目,为捕获数据更改(change data capture,CDC)提供了一五个 低延迟的流式处置平台。让他安装这个 配置Debezium去监控你的数据库,让他的应用就还可不还能否 消费对数据库的每一五个 行级别(row-level)的更改。不到已提交的更改才是可见的,就是 你的应用不需要担心事务(transaction)可能更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一五个 统一的模型,就是 你的应用不需要担心每并就有数据库管理系统的错综比较比较复杂。另外,可能Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,这个 ,你的应用还可不还能否 随时停止再重启,而不需要错过它停止运行时趋于稳定的事件,保证了所有的事件都能被正确地、完整篇 趋于稳定理掉。

该项目的GitHub地址为:https://github.com/debezium/debezium   这是一五个 开源的项目。

 

  一五个 监控数据库,这个 在数据变动的事先获得通知虽然老就是 一件很比较复杂的事情。关系型数据库的触发器还可不还能否 做到,这个 只对特定的数据库有效,这个 通常不到更新数据库内的清况 (无法和实物的守护守护进程通信)。这个 数据库提供了监控数据变动的API可能框架,这个 没一五个 标准,偏离 数据库的实现方式 就有不同的,这个 时需几瓶特定的知识和理解特定的代码可不还能否 运用。确保以相同的顺序查看和处置所有更改,并肩最小化影响数据库仍然非常具有挑战性。

       Debezium正好提供了模块为你做有有哪些比较复杂的工作。这个 模块是通用的,这个 可不还能否 适用多种数据库管理系统,但在功能和性能方面仍有这个 限制。另这个 模块是为特定的数据库管理系统定制的,就是 你们歌词 都歌词 通常还可不还能否 更多地利用数据库系统并就有的型态来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。

Debezium是一五个 捕获数据更改(CDC)平台,这个 利用Kafka和Kafka Connect实现了被委托人的持久性、可靠性和容错性。每一五个 部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一五个 上游数据库服务器,捕获所有的数据库更改,这个 记录到一五个 可能多个Kafka topic(通常一五个 数据库表对应一五个 kafka topic)。Kafka确保所有有有哪些数据更改事件都可不还能否 多副本这个 总体上有序(Kafka不到保证一五个 topic的单个分区内有序),一五个 ,更多的客户端还可不还能否 独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(可能N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,还可不还能否 把对数据库的压力降到1)。另外,客户端还可不还能否 随时停止消费,这个 重启,从上次停止消费的地方接着消费。每个客户端还可不还能否 自行决定你们歌词 都歌词 否有时需exactly-once可能at-least-once消息交付语义保证,这个 所有的数据库可能表的更改事件是按照上游数据库趋于稳定的顺序被交付的。

       对于不时需可能不之前 这个 容错级别、性能、可扩展性、可靠性的应用,你们歌词 都歌词 还可不还能否 使用内嵌的Debezium connector引擎来直接在应用实物运行connector。这个 应用仍时需消费数据库更改事件,但更希望connector直接传递给它,而就有持久化到Kafka里。

更完整篇 的介绍还可不还能否 参考:https://www.jianshu.com/p/f86219b1ab98

datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。

github地址:https://github.com/alibaba/DataX    

A、设计架构:

数据交换通过DataX进行中转,任何数据源假如有一天和DataX连接上即还可不还能否 和已实现的任意数据源同步

B、框架

 

核心模块介绍:

  1. DataX完成单个数据同步的作业,你们歌词 都歌词 称之为Job,DataX接受到一五个 Job事先,将启动一五个 守护守护进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一五个 Task一定会负责一偏离 数据的同步工作。
  3. 切分多个Task事先,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一五个 TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一五个 Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的守护守护进程来完成任务同步工作。
  5. DataX作业运行起来事先, Job监控并等候多个TaskGroup模块任务完成,等候所有TaskGroup任务完成后Job成功退出。这个 ,异常退出,守护守护进程退出值非0

DataX调度流程:

举例来说,用户提交了一五个 DataX作业,这个 配置了20个并发,目的是将一五个 100张分表的mysql数据同步到odps中间。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共时需分配一五个 TaskGroup。
  3. 一五个 TaskGroup平分切分好的100个Task,每一五个 TaskGroup负责以五个并发共计运行2五个Task。

优势:

  • 偏离 插件就有被委托人的数据转换策略,放置数据失真;
  • 提供作业全链路的流量以及数据量运行时监控,包括作业并就有清况 、数据流量、数据效率、执行进度等。
  • 可能各种导致 导致 传输报错的脏数据,DataX还可不还能否 实现精确的过滤、识别、埋点、展示,为用户提过多种脏数据处置模式;
  • 精确的效率控制
  • 健壮的容错机制,包括守护守护进程实物重试、守护守护进程级别重试;

从插件视角看框架

  • Job:是DataX用来描述从一五个 源头到目的的同步作业,是DataX数据同步的最小业务单元;
  • Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
  • TaskGroup:一组Task集合,在同一五个 TaskGroupContainer执行下的Task集合称为TaskGroup;
  • JobContainer:Job执行器,负责Job全局拆分、调度、前置一段话和后置一段话等工作的工作单元。累似 Yarn中的JobTracker;
  • TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,累似 Yarn中的TAskTacker。

    总之,Job拆分为Task,分别在框架提供的容器中执行,插件只时需实现Job和Task两偏离 逻辑。

    物理执行有并就有运行模式:

  • Standalone:单守护守护进程运行,沒有实物依赖;
  • Local:单守护守护进程运行,统计信息,错误信息汇报到集中存储;
  • Distrubuted:分布式多守护守护进程运行,依赖DataX Service服务;

    总体来说,当JobContainer和TaskGroupContainer运行在同一五个 守护守护进程内的事先就是 单机模式,在不同守护守护进程执行就是 分布式模式。

可能时需开发插件,还可不还能否 看zhege这个 插件开发指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md 

数据源支持清况 :

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库 MySQL 读 、写
            Oracle         √         √     读 、写
  SQLServer 读 、写
  PostgreSQL 读 、写
  DRDS 读 、写
  通用RDBMS(支持所有关系型数据库) 读 、写
阿里云数仓数据存储 ODPS 读 、写
  ADS  
  OSS 读 、写
  OCS 读 、写
NoSQL数据存储 OTS 读 、写
  Hbase0.94 读 、写
  Hbase1.1 读 、写
  Phoenix4.x 读 、写
  Phoenix5.x 读 、写
  MongoDB 读 、写
  Hive 读 、写
无型态化数据存储 TxtFile 读 、写
  FTP 读 、写
  HDFS 读 、写
  Elasticsearch  
时间序列数据库 OpenTSDB  
  TSDB