开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大数据 实时计算平 flink 读取 dataHub作为数据源,给个靠谱的demo 没一个文档能跑通

实时计算平台版本:v3.16.2r-blink-SP002
flink版本:blink-1.6.4

给一个靠谱的完整的能跑通的demo,不要让我看文档了,翻遍了所有flink读取datahup的demo没有一个能跑通的,都是各种报错, 找了N多个技术支持给了技术文档还是不行,公司购买的 实时计算平 和 datahup 组件,就给了一个文档还不能用,按照技术文档连一个flink任务都没有跑起来过,

Demo地址 Git地址:https://github.com/alibaba/alibaba-flink-connectors/blob/flink-1.5.2-compatible/datahub-connector

报错内容1:

2024-09-22 11:48:43,791 ERROR [Topology-0 (1/1)] org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not execute the task Source: Custom Source -> Flat Map -> Sink: Unnamed (1/1), aborting the execution
java.lang.NoSuchMethodError: org.apache.flink.api.common.state.OperatorStateStore.getUnionListState(Lorg/apache/flink/api/common/state/ListStateDescriptor;)Lorg/apache/flink/api/common/state/ListState;
    at com.alibaba.flink.connectors.common.source.AbstractDynamicParallelSource.initializeState(AbstractDynamicParallelSource.java:115)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:289)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:872)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:859)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:292)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:833)
    at java.lang.Thread.run(Thread.java:834)

报错内容2:

by:java.lang.NoClassDeffoundError:org/apache/flink/shaded/guava18/com/google/common/cache/Cacheloader
atcom.alibaba.flink.connectors.datahub.datastream.sourceDatahubSourceFunction.createProvider(DatahubSourceFunction.java269)
at com.alibaba.flink,connectors.datahub.datastream.sourceDatahubsourceFunction.init(DatahubSourceFunction.java115)
at com.alibaba.flink.connectors.datahub.datastream.example.DatahubsourceFunctionExample.runExample
tahubSourceFunctionExample.java:46)
at com.alibaba.flink.connectors.datahub.datastream.example.DatahubSourceFunctionExample.main
tahubSourceFunctionExample.java:68)
at sun.reflect.NativeMethodAccessorImpl.invokeo(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

代码如下:

public class DatahubSourceFunctionExample implements Serializable {

    private String endPoint = "";
    private String accessId = "";
    private String accessKey = "";
    private String projectName = "";
    private String topicName = "";

    public void runExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DatahubSourceFunction datahubSource =
                new DatahubSourceFunction(endPoint, projectName, topicName, accessId, accessKey, 0,
                                        Long.MAX_VALUE, 1, 1, 1);
        env.addSource(datahubSource).flatMap(
                (FlatMapFunction<List<RecordEntry>, Tuple2<String, Long>>) (recordEntries, collector) -> {
            for (RecordEntry recordEntry : recordEntries) {
                collector.collect(getStringLongTuple2(recordEntry));
            }
        }).returns(new TypeHint<Tuple2<String, Long>>() {}).print();
        env.execute();
    }

    private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
        Tuple2<String, Long> tuple2 = new Tuple2<>();
        TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
        tuple2.f0 = (String) recordData.getField(0);
        tuple2.f1 = (Long) recordData.getField(1);
        return tuple2;
    }

    public static void main(String[] args) throws Exception {
        DatahubSourceFunctionExample sourceFunctionExample = new DatahubSourceFunctionExample();
        sourceFunctionExample.runExample();
    }
}

pom文件:


<dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>aliyun-sdk-datahub</artifactId>
            <version>2.12.2-public</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>jcl-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>jul-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <!--<exclusion>-->
                    <!--<artifactId>jackson-databind</artifactId>-->
                    <!--<groupId>com.fasterxml.jackson.core</groupId>-->
                <!--</exclusion>-->
                <!--<exclusion>-->
                    <!--<artifactId>jackson-annotations</artifactId>-->
                    <!--<groupId>com.fasterxml.jackson.core</groupId>-->
                <!--</exclusion>-->
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.flink</groupId>
            <artifactId>aliyun-connectors-common</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!--necessary for datahub example run-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-netty</artifactId>
            <version>4.0.27.Final-2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>datahub-client-library</artifactId>
            <version>1.1.12-public</version>
        </dependency>

展开
收起
游客74kv2ffqx6rmu 2024-09-22 15:47:36 11 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载