本文介绍DTS提供的SDK示例代码的运行流程,并提供示例供您参考。
SDK示例代码下载
- RDS MySQL:DtsSubscribeDemo
- DRDS:drds_demo
下载并解压后,您需要使用文本编辑工具打开pom.xml文件,将SDK的版本修改为最新版本。
说明 您可以在Maven网站中获取最新的数据订阅SDK版本,详情请参见
数据订阅SDK的Maven页面。
初始化RegionContext
RegionContext
主要用于保存安全认证凭证及网络访问模式的设置,下述代码为您演示如何始化RegionContext
。
import java.util.List; import com.aliyun.drc.clusterclient.RegionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MainClass { public static void main(String[] args) throws Exception { // 创建RegionContext RegionContext context = new RegionContext(); // 配置阿里云账号的AccessKey ID和AccessKey Secret context.setAccessKey("<AccessKey>"); context.setSecret("<AccessKeySecret>"); // 是否通过公网连接数据订阅通道 context.setUsePublicIp(true); // 设置传输数据格式为二进制格式 context.setUseBinary(true); // 开启网络优化模式 context.setDrcNet(true); ………… } }
初始化ClusterClient
ClusterClient
主要用于连接数据订阅通道和接收增量变更数据,下述代码为您演示如何初始化ClusterClient
。
import java.util.List; import com.aliyun.drc.clusterclient.ClusterClient; import com.aliyun.drc.clusterclient.DefaultClusterClient; import com.aliyun.drc.clusterclient.RegionContext; public class MainClass { public static void main(String[] args) throws Exception { // 创建RegionContext RegionContext context = new RegionContext(); context.setAccessKey("<AccessKey>"); context.setSecret("<AccessKeySecret>"); context.setUsePublicIp(true); // 创建ClusterClient final ClusterClient client = new DefaultClusterClient(context); …………… } }
初始化Listener
Listener
通过定义notify
函数来接受订阅数据并完成数据消费。下述代码为您演示简易的消费逻辑(将订阅到的增量数据输出至屏幕)。
import com.aliyun.drc.clusterclient.ClusterClient; import com.aliyun.drc.clusterclient.ClusterListener; import com.aliyun.drc.clusterclient.DefaultClusterClient; import com.aliyun.drc.clusterclient.RegionContext; import com.aliyun.drc.clusterclient.message.ClusterMessage; public class MainClass { public static void main(String[] args) throws Exception { // 初始化RegionContext ……… // 初始化ClusterClient ……… ClusterListener listener = new ClusterListener(){ @Override public void notify(List<ClusterMessage> messages) throws Exception { for (ClusterMessage message : messages) { // 将订阅到的增量数据输出至屏幕 System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":" + message.getRecord().getOpt()); // 完成消费后向DTS服务器发送ACK确认信息(必须调用) message.ackAsConsumed(); } } } }
说明 由于
ackAsConsumed()
接口会将SDK消费的最新一条数据的位点及时间戳发送给DTS服务器,如果SDK运行出现故障,再次启动SDK时,会自动从DTS服务器上获取该消费时间点并继续消费,避免消费到重复的数据。
启动ClusterClient
import java.util.List; import com.aliyun.drc.clusterclient.ClusterClient; import com.aliyun.drc.clusterclient.ClusterListener; import com.aliyun.drc.clusterclient.DefaultClusterClient; import com.aliyun.drc.clusterclient.RegionContext; import com.aliyun.drc.clusterclient.message.ClusterMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MainClass { public static void main(String[] args) throws Exception { // 初始化RegionContext ………… // 初始化ClusterClient ………… // 初始化ClusterListener ………… // 添加监听者 client.addConcurrentListener(listener); // 设置订阅通道ID client.askForGUID("dts_rdsr******_DSF"); // 启动后台线程(主线程不能退出) client.start(); }
在启动ClusterClient
之前,需要将Listener
添加到ClusterClient
中,当ClusterClient
从订阅通道中获取到增量数据时,会同步回调Listener
的notify
函数来执行数据消费。