DataHub通过DataConnector流转到MaxCompute全链路测试

简介: 前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。实现使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

概述

前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。

实验目的

使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

实验步骤

1、Dataworks建表SQL脚本

CREATE TABLE IF NOT EXISTS ods_log_tracker( ip STRING COMMENT 'client ip address', user STRING, accesstime string, method STRING COMMENT 'HTTP request type, such as GET POST...', url STRING, protocol STRING, status BIGINT COMMENT 'HTTP reponse code from server', byte_cnt BIGINT, referer STRING, agent STRING) PARTITIONED BY(dt STRING);

_

2、Datahub控制台创建Topic,并关联创建DataConnector

_
_

关于分区选择细节参考链接

3、创建效果
_

4、Java SDK发送信息到Topic

  • pom.xml
        <dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>aliyun-sdk-datahub</artifactId>
            <version>2.13.0-public</version>
        </dependency>
  • Java Code Sample
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.*;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.*;
import java.util.ArrayList;
import java.util.List;

public class SendData1 {

    public static void main(String[] args) {

        // Endpoint以Region: 华北2为例,其他Region请按实际情况填写
        String endpoint = "http://dh-cn-beijing.aliyuncs.com";
        String accessId = "********";
        String accessKey = "********";
        String projectName = "odpsdemo";  // project项目名称
        String topicName = "ods_log_tracker";  // topic名称
        String shardId = "0";  // 分区ID
        // 创建DataHubClient实例
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                // 是否开启二进制传输,服务端2.12版本开始支持
                                new AliyunAccount(accessId, accessKey), true))
                //专有云使用出错尝试将参数设置为       false
                // HttpConfig可不设置,不设置时采用默认值
                .setHttpConfig(new HttpConfig().setConnTimeout(10000))
                .build();

        // 写入Tuple型数据
        RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
        // 生成100条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                RecordEntry recordEntry = new RecordEntry();
                TupleRecordData data = new TupleRecordData(recordSchema);
                data.setField("ip","ip");
                data.setField("user","user");
                data.setField("accesstime","accesstime");
                data.setField("method","method");
                data.setField("url","url");
                data.setField("protocol","protocol");
                data.setField("referer","referer");
                data.setField("agent","agent");
                data.setField("dt","dt");
                data.setField("status",1L);
                data.setField("byte_cnt",100L);

                recordEntry.setRecordData(data);
                recordEntry.setShardId(shardId);
                recordEntries.add(recordEntry);
            }
            try {
                // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
                //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
                datahubClient.putRecords(projectName, topicName, recordEntries);
                System.out.println("write data successful");
            } catch (InvalidParameterException e) {
                System.out.println("invalid parameter, please check your parameter");
                System.exit(1);
            } catch (AuthorizationFailureException e) {
                System.out.println("AK error, please check your accessId and accessKey");
                System.exit(1);
            } catch (ResourceNotFoundException e) {
                System.out.println("project or topic or shard not found");
                System.exit(1);
            } catch (ShardSealedException e) {
                System.out.println("shard status is CLOSED, can not write");
                System.exit(1);
            } catch (DatahubClientException e) {
                System.out.println("other error");
                System.out.println(e);
                System.exit(1);
            }
    }
}
  • 写入Topic情况查看
    _

4、数据流转情况查看
_
_

参考链接

DataHub Java SDK介绍
DataWorks 创建业务流程

相关文章
|
8月前
|
人工智能 数据可视化 测试技术
AI测试平台自动遍历:低代码也能玩转全链路测试
AI测试平台的自动遍历功能,通过低代码配置实现Web和App的自动化测试。用户只需提供入口链接或安装包及简单配置,即可自动完成页面结构识别、操作验证,并生成可视化报告,大幅提升测试效率,特别适用于高频迭代项目。
|
监控 测试技术 数据库连接
RunnerGo API 性能测试实战:从问题到解决的全链路剖析
API性能测试是保障软件系统稳定性与用户体验的关键环节。本文详细探讨了使用RunnerGo全栈测试平台进行API性能测试的全流程,涵盖测试计划创建、场景设计、执行分析及优化改进。通过电商平台促销活动的实际案例,展示了如何设置测试目标、选择压测模式并分析结果。针对发现的性能瓶颈,提出了代码优化、数据库调优、服务器资源配置和缓存策略等解决方案。最终,系统性能显著提升,满足高并发需求。持续关注与优化API性能,对系统稳定运行至关重要。
|
9月前
|
人工智能 缓存 监控
大模型性能测试实战指南:从原理到落地的全链路解析
本文系统解析大模型性能测试的核心方法,涵盖流式响应原理、五大关键指标(首Token延迟、吐字率等)及测试策略,提供基于Locust的压测实战方案,并深入性能瓶颈分析与优化技巧。针对多模态新挑战,探讨混合输入测试与资源优化
|
Web App开发 前端开发 安全
前端研发链路之测试
本文由前端徐徐撰写,介绍了前端测试的重要性及其主要类型,包括单元测试、E2E测试、覆盖率测试、安全扫描和自动化测试。文章详细讲解了每种测试的工具和应用场景,并提供了选择合适测试策略的建议,帮助开发者提高代码质量和用户体验。
459 3
前端研发链路之测试
|
人工智能 分布式计算 DataWorks
首批!阿里云 MaxCompute 完成中国信通院数据智能平台专项测试
2024年5月31日,在中国信通院组织的首批数据智能平台专项测试中,阿里云数据智能平台解决方案(MaxCompute、DataWorks、PAI)顺利完成测试。
771 5
首批!阿里云 MaxCompute 完成中国信通院数据智能平台专项测试
|
Java 测试技术
SpringBoot单元测试快速写法问题之区分链路环节是否应该被Mock如何解决
SpringBoot单元测试快速写法问题之区分链路环节是否应该被Mock如何解决
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用合集之如何让冒烟测试能够立即执行
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
Java 测试技术 API
SpringBoot单元测试快速写法问题之确定链路上的Mock点如何解决
SpringBoot单元测试快速写法问题之确定链路上的Mock点如何解决
|
数据处理 Perl
SFP光口IBERT链路误码测试
LogiCORE IBERT IP核是Xilinx提供的集成式误码率测试IP核,该IP核产生测试样式,由发送端发出测试样式,经接收端接收测试样式并进行误码检测、分析,以检测Xilinx器件内部高速串行收发器的收发性能。由IBERT IP生成的测试工程会提供一个图形化测试界面,方便用户直观控制和检测高速串行收发器的参数指标。 XQ6657Z35-EVM 评估板SFP光口IBERT链路误码测试的运行效果。IBERT链路误码测试例程两个,分别用于光口运行在5Gbps和10Gbps两种线路速率情形下的误码统计和眼图测试。
SFP光口IBERT链路误码测试
|
存储 分布式计算 大数据
首批!阿里云MaxCompute完成中国信通院基于无服务器架构大数据平台测试
近日,阿里云计算有限公司MaxCompute产品顺利完成中国信通院首批无服务器架构(Serverless)大数据平台测试。
980 7