本文介绍DTS提供的SDK示例代码的运行流程,并提供示例供您参考。

SDK示例代码下载

下载并解压后,您需要使用文本编辑工具打开pom.xml文件,将SDK的版本修改为最新版本。

说明 您可以在Maven网站中获取最新的数据订阅SDK版本,详情请参见 数据订阅SDK的Maven页面

初始化RegionContext

RegionContext主要用于保存安全认证凭证及网络访问模式的设置,下述代码为您演示如何始化RegionContext

import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
      // 创建RegionContext
      RegionContext context = new RegionContext();
      // 配置阿里云账号的AccessKey ID和AccessKey Secret
      context.setAccessKey("<AccessKey>");
      context.setSecret("<AccessKeySecret>");
       // 是否通过公网连接数据订阅通道
      context.setUsePublicIp(true);
      // 设置传输数据格式为二进制格式
      context.setUseBinary(true);
      // 开启网络优化模式
      context.setDrcNet(true);

        …………
    }
}

初始化ClusterClient

ClusterClient主要用于连接数据订阅通道和接收增量变更数据,下述代码为您演示如何初始化ClusterClient

import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
      public static void main(String[] args) throws Exception {
        // 创建RegionContext
          RegionContext context = new RegionContext();
          context.setAccessKey("<AccessKey>");
          context.setSecret("<AccessKeySecret>");
          context.setUsePublicIp(true);

          // 创建ClusterClient
          final ClusterClient client = new DefaultClusterClient(context);

         ……………
    }
}

初始化Listener

Listener通过定义notify函数来接受订阅数据并完成数据消费。下述代码为您演示简易的消费逻辑(将订阅到的增量数据输出至屏幕)。

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;

public class MainClass
{
    public static void main(String[] args) throws Exception {
        // 初始化RegionContext
        ………
        // 初始化ClusterClient
        ………
        ClusterListener listener = new ClusterListener(){
             @Override
             public void notify(List<ClusterMessage> messages) throws Exception {
                  for (ClusterMessage message : messages) {  
                    // 将订阅到的增量数据输出至屏幕
                      System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
                      + message.getRecord().getOpt());  
                      // 完成消费后向DTS服务器发送ACK确认信息(必须调用)
                      message.ackAsConsumed();
              }
      }
     }
}
说明 由于 ackAsConsumed()接口会将SDK消费的最新一条数据的位点及时间戳发送给DTS服务器,如果SDK运行出现故障,再次启动SDK时,会自动从DTS服务器上获取该消费时间点并继续消费,避免消费到重复的数据。

启动ClusterClient

import java.util.List;

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
    // 初始化RegionContext
    …………
    // 初始化ClusterClient
    …………
    // 初始化ClusterListener
    …………
    // 添加监听者
      client.addConcurrentListener(listener);
      // 设置订阅通道ID
      client.askForGUID("dts_rdsr******_DSF");
      // 启动后台线程(主线程不能退出)
      client.start();
}

在启动ClusterClient之前,需要将Listener添加到ClusterClient中,当ClusterClient从订阅通道中获取到增量数据时,会同步回调Listenernotify函数来执行数据消费。

相关文档

使用SDK示例代码消费订阅数据