DataHub通过DataConnector流转到MaxCompute全链路测试

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。实现使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

概述

前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。

实验目的

使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

实验步骤

1、Dataworks建表SQL脚本

CREATE TABLE IF NOT EXISTS ods_log_tracker( ip STRING COMMENT 'client ip address', user STRING, accesstime string, method STRING COMMENT 'HTTP request type, such as GET POST...', url STRING, protocol STRING, status BIGINT COMMENT 'HTTP reponse code from server', byte_cnt BIGINT, referer STRING, agent STRING) PARTITIONED BY(dt STRING);

_

2、Datahub控制台创建Topic,并关联创建DataConnector

_
_

关于分区选择细节参考链接

3、创建效果
_

4、Java SDK发送信息到Topic

  • pom.xml
        <dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>aliyun-sdk-datahub</artifactId>
            <version>2.13.0-public</version>
        </dependency>
  • Java Code Sample
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.*;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.*;
import java.util.ArrayList;
import java.util.List;

public class SendData1 {

    public static void main(String[] args) {

        // Endpoint以Region: 华北2为例,其他Region请按实际情况填写
        String endpoint = "http://dh-cn-beijing.aliyuncs.com";
        String accessId = "********";
        String accessKey = "********";
        String projectName = "odpsdemo";  // project项目名称
        String topicName = "ods_log_tracker";  // topic名称
        String shardId = "0";  // 分区ID
        // 创建DataHubClient实例
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                // 是否开启二进制传输,服务端2.12版本开始支持
                                new AliyunAccount(accessId, accessKey), true))
                //专有云使用出错尝试将参数设置为       false
                // HttpConfig可不设置,不设置时采用默认值
                .setHttpConfig(new HttpConfig().setConnTimeout(10000))
                .build();

        // 写入Tuple型数据
        RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
        // 生成100条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                RecordEntry recordEntry = new RecordEntry();
                TupleRecordData data = new TupleRecordData(recordSchema);
                data.setField("ip","ip");
                data.setField("user","user");
                data.setField("accesstime","accesstime");
                data.setField("method","method");
                data.setField("url","url");
                data.setField("protocol","protocol");
                data.setField("referer","referer");
                data.setField("agent","agent");
                data.setField("dt","dt");
                data.setField("status",1L);
                data.setField("byte_cnt",100L);

                recordEntry.setRecordData(data);
                recordEntry.setShardId(shardId);
                recordEntries.add(recordEntry);
            }
            try {
                // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
                //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
                datahubClient.putRecords(projectName, topicName, recordEntries);
                System.out.println("write data successful");
            } catch (InvalidParameterException e) {
                System.out.println("invalid parameter, please check your parameter");
                System.exit(1);
            } catch (AuthorizationFailureException e) {
                System.out.println("AK error, please check your accessId and accessKey");
                System.exit(1);
            } catch (ResourceNotFoundException e) {
                System.out.println("project or topic or shard not found");
                System.exit(1);
            } catch (ShardSealedException e) {
                System.out.println("shard status is CLOSED, can not write");
                System.exit(1);
            } catch (DatahubClientException e) {
                System.out.println("other error");
                System.out.println(e);
                System.exit(1);
            }
    }
}
  • 写入Topic情况查看
    _

4、数据流转情况查看
_
_

参考链接

DataHub Java SDK介绍
DataWorks 创建业务流程

相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
119 0
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
61 1
|
2月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
44 4
|
2月前
|
运维 监控 数据可视化
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
67 1
|
2月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
66 1
|
2月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
84 1
|
2月前
|
存储 NoSQL 大数据
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
35 3
|
2月前
|
存储 Prometheus NoSQL
大数据-44 Redis 慢查询日志 监视器 慢查询测试学习
大数据-44 Redis 慢查询日志 监视器 慢查询测试学习
26 3
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
57 3
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
42 1