阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data
Transmission Service),您可以将DRDS同步至DataHub,可用于流计算等大数据产品对数据进行实时分析等场景。
前提条件
- DataHub实例的地域为华东1、华东2、华北2或华南1。
- DataHub实例中,已创建用作接收同步数据的Project,详情请参见创建Project。
- 待同步的表具备主键或唯一约束。
功能限制
- 仅支持表级别的数据同步。
- 不支持全量数据初始化,即DTS不会将源DRDS实例中同步对象的存量数据同步至目标DataHub实例中。
- 不支持同步DDL操作。如果在数据同步过程中源DRDS执行了DDL操作,将导致数据同步失败。
注意事项
- 请勿在数据同步的过程中变更DRDS实例的规格,否则将导致同步失败。
- 如果需要在数据同步的过程中切换DRDS实例的网络类型,在您执行完网络类型切换操作后,请提交工单调整同步链路的网络连接信息。
支持同步的SQL操作
INSERT、UPDATE、DELETE。
操作步骤
Topic结构定义说明
DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为id
、name
、address
,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加dts_
的前缀。
结构定义说明:
旧版附加列名称 | 新版附加列名称 | 说明 |
---|---|---|
dts_record_id |
new_dts_sync_dts_record_id |
增量日志的记录id,为该日志唯一标识。
|
dts_operation_flag |
new_dts_sync_dts_operation_flag |
操作类型,取值:
|
dts_instance_id |
new_dts_sync_dts_instance_id |
数据库的server id。 |
dts_db_name |
new_dts_sync_dts_db_name |
数据库名称。 |
dts_table_name |
new_dts_sync_dts_table_name |
表名。 |
dts_utc_timestamp |
new_dts_sync_dts_utc_timestamp |
操作时间戳,即binlog的时间戳(UTC 时间)。 |
dts_before_flag |
new_dts_sync_dts_before_flag |
所有列的值是否更新前的值,取值:Y或N。 |
dts_after_flag |
new_dts_sync_dts_after_flag |
所有列的值是否更新后的值,取值:Y或N。 |
关于dts_before_flag和dts_after_flag的补充说明
对于不同的操作类型,增量日志中的dts_before_flag
和dts_after_flag
定义如下:
- INSERT
当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以
dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - UPDATE
当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的
dts_record_id
、dts_operation_flag
及dts_utc_timestamp
对应的值相同。第一条增量日志记录了更新前的值,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N。第二条增量日志记录了更新后的值,所以dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - DELETE
当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N,示例如下。
后续操作
配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算。
原创文章,作者:网友投稿,如若转载,请注明出处:https://www.cloudads.cn/archives/33797.html