阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

简介: 阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

1. protobuf简介

Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。

Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。

参考文档:

  1. 维基百科
  2. 官方文档

2. .proto文件

我们使用proto3的语法简单创建了一个名为User的消息体,其中包含了多种int,bool,string以及不同精度的浮点数字段,并将该文件保存为user.proto。

syntax = "proto3";
// proto 的package名称
// package com.example;
// java package名称,如果不指定会默认用proto的package
// option java_package = "com.example";
// 是否编译成多个文件
// option java_multiple_files = true;
// java的封装类的类名
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

3. .proto -> java.class

将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。

protoc --java_out=./ /path/user.proto

生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。

image.png

4. 序列化与反序列化测试

package org.example;
public class UserProtoTest {
    public static void main(String[] args) throws Exception {
        // 通过getClientPush方法将数据序列化
        byte[] byteData = getClientPush();
        System.out.println("数据字节长度:" + byteData.length);
        /**
         * 接收数据反序列化:将字节数据转化为对象数据。
         */
        UserProtoBuf.User user = UserProtoBuf.User.parseFrom(byteData);
        // 看下对象的tostring方法打印出来的内容
        System.out.println("user obj to string:\n" + user);
        // 随便获取下某些字段进行打印
        System.out.println("UserName=" + user.getUserName());
        System.out.println("Timestamp=" + user.getTimestamp());
        System.out.println("Height=" + user.getHeight());
    }
    /**
     * 模拟发送方,将数据序列化后发送
     *
     * @return
     */
    private static byte[] getClientPush() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.74F);
        user.setWeight(70.07D);
        user.setUserName("adam");
        user.setFullAddress("China");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

image.png

5. 云Kafka读写protobuf测试

测试环境以云kafka为例

5.1. 写Kafka

package org.example;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerProtoTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        //        alikafka_post-cn-wwo3i0eho00o
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //云消息队列 Kafka 版消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        //请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "sunyf_topic";
        byte[] value = getProtoTestData();
        try {
            //批量获取Future对象可以加快速度,但注意,批量不要太大。
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<String, byte[]>(topic, value);
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            futures.add(metadataFuture);
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                //同步获得Future对象的结果。
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
    private static byte[] getProtoTestData() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.88F);
        user.setWeight(66.76D);
        user.setUserName("name");
        user.setFullAddress("addr");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

5.2. 读Kafka

package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerProtoTest  {
    public static void main(String args[]) {
        Properties props = new Properties();
        //设置接入点,通过控制台获取对应Topic的接入点。
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        //如果是SSL接入点实例,请注释以下第一行代码。
        //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//         * 两次Poll之间的最大允许间隔。
//         * 消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//         * 设置单次拉取的量,走公网访问时,该参数会有较大影响。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        //每次poll的最大数量。
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        // 这里注意下value的序列化格式,默认的是string,这里改成了byte的
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sunyf_group2");
        //如果是SSL接入点实例,请取消以下一行代码的注释。
        //Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        props.put("auto.offset.reset", "earliest");
        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, byte[]> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List<String> subscribedTopics = new ArrayList<String>();
        //如果是SSL接入点实例,请注释以下前五行代码,取消第六行代码的注释。
        //如果需要订阅多个Topic,则在这里add进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = "sunyf_topic";
        String[] topics = topicStr.split(",");
        for (String topic : topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(10000);
                //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                    byte[] value = record.value();
                    UserProtoBuf.User user = UserProtoBuf.User.parseFrom(value);
                    System.out.println("user=" + user);
                    System.out.println("UserName=" + user.getUserName());
                    System.out.println("Timestamp=" + user.getTimestamp());
                    System.out.println("Height=" + user.getHeight());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(10000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

我们通过上述代码,对写入Kafka实例的protobuf消息进行读取,我们在代码中重置了消费位点到最早的位点,"auto.offset.reset" = "earliest",从日志中我们也看到了客户端的这一动作,并可以看到从partition:10,offset:0开始消费,成功消费到一个user message并打印出了其中的部分字段。

image.png

6. 编译打包上传到flink

可以通过IDEA在本地进行打包

  1. 通过flink-quick-start的项目为基础,创建打包环境

archetypeGroupId

org.apache.flink

archetypeArtifactId

flink-quickstart-java

archetypeVersion

1.17.2

  1. 添加flink-protobuf的依赖,如下所示的dependency到pom中
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-protobuf</artifactId>
  <version>1.17.2</version>
</dependency>
  1. 添加protoc的java类到项目中
  2. 使用maven对项目进行打包
  3. 对target文件下的打包后的jar进行检查,是否含有protoc编译后的类。这里使用了官网的测试类,相关参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/。该proto文件中配置了option java_multiple_files = true; 所以生成了如下三个类:com.example.SimpleTest,com.example.SimpleTestOrBuilder,com.example.SimpleTestOuterClass

image.png

7. flink作业测试

7.1. 上传依赖

image.png

7.2. 作业开发与配置

通过FLINK-SQL创建任务,其中字段类型的映射可以参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#data-type-mapping

CREATE TEMPORARY TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sunyf_topic',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sunyf_group_flink',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  -- 指定消息体对应的message类
  'protobuf.message-class-name' = 'com.example.SimpleTest',
  'protobuf.ignore-parse-errors' = 'true'
)
;
CREATE TEMPORARY TABLE print_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'    
)
;
insert into print_sink
select * from simple_test;

任务发布后我们可以看到taskmanager中的printsinkoutputwriter有相关数据的输出,与我们写入kafka时的数据一致。

image.png

8. flink中对proto2和3版本的差异

空值的差异以及处理

protobuf.read-default-values

只有以proto2编译时,此选项才有效,且默认为false。如果为true,则格式将读取空值作为proto文件中定义的默认值。如果为false,则生成null值。如果proto语法是proto3,则该值将强制设置为true,因为proto3的标准是使用默认值。

关于空值的默认值,可以参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#null-values

proto2编译

2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[null, xxx, null, null, null, null, null, null, null]
2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, null, null, null, null, null, WEB]

proto3编译

2024-03-26 16:22:20,411 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[0, xxx, 0, [], 0.0, {}, [], 0, UNIVERSAL]
2024-03-26 16:22:20,412 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, [], 0.0, {}, [], 0, WEB]

9. 相关报错

9.1. com.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptor cannot be cast to com.google.protobuf.DescriptorsDescriptor

9.1.1. 表象

相关堆栈报错如下所示:

org.apache.flink.table.api.ValidationException: SQL validation failed. get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54)
  at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1152)
  at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3777)
  at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.StatusInterceptor$1.onHalfClose(StatusInterceptor.java:116)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.CancelHandlerRegister$1.onHalfClose(CancelHandlerRegister.java:59)
  at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
  at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
  at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
  at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:716)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:385)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:596)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:250)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:180)
  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
  at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3847)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2716)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2261)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2120)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:721)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:707)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3693)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:615)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:378)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:354)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1828)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1504)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:450)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:476)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:904)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:423)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:320)
  at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:434)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:396)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$30(DelegateOperationExecutor.java:273)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
  at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:322)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$36(DelegateOperationExecutor.java:349)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$FileDescriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
  ... 43 more

9.1.2. 代码分析

JM报错根因乍看之下具有较强隐蔽性,起初怀疑是包冲突,但provided runtime jar以及shade相关类后仍异常。

结合堆栈以及flink-protobuf-1.17.2.jar中源码:

image.png

及SQL代码:

CREATE TEMPORARY TABLE table_name (
...
)
   WITH (
  'connector' = 'kafka',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'com.example.TcTest',
  'protobuf.ignore-parse-errors'='true'
);

及tc_test.proto文件:

syntax = "proto3";
option java_package = "com.example";
message M1 {
...
}
message M2 {
...
}
message M3 {
...
}

及生成的java类:

public final class TcTest {
  public interface M1OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M1)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M1 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M1)
      M1OrBuilder {
      ...
  }
  public interface M2OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M2)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M2 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M2)
      M2OrBuilder {
      ...
  }
}

综合分析,.proto文件中message消息体众多,且option中没有指定java_outer_classname,则java文件名默认改为驼峰命名的方式,即TcTest.java,且为嵌套的java类,内涵所有的M1,M2等所有message消息体,以子类的形式存在,且所有类中都存在getDescriptor方法,分别用于描述proto文件和message文件,但返回类型不同,TcTest类返回的类型为FileDescriptor,而M1等类返回的类型为Descriptor,在Flink SQL配置时仅指定了主类,与编译参数错位,导致类转换异常。拆分proto文件中message到多个文件后编译或指定子类(如com.example.TcTest$M1)后正常消费上游数据。

9.2. org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'

9.2.1. 表象

相关堆栈报错如下所示:

2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf codegen compile error: 
package org.apache.flink.formats.protobuf.deserialize;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericArrayData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import com.google.protobuf.ByteString;
public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{
public static RowData decode(.UserProtoBuf.User message){
RowData rowData=null;
.UserProtoBuf.User message0 = message;
GenericRowData rowData0 = new GenericRowData(7);
Object elementDataVar1 = null;
elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);
Object elementDataVar2 = null;
elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);
Object elementDataVar3 = null;
elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);
Object elementDataVar4 = null;
elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);
Object elementDataVar5 = null;
elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);
Object elementDataVar6 = null;
elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);
Object elementDataVar7 = null;
elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);
rowData = rowData0;
return rowData;
}
}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]
Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
    at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more

9.2.2. 源码分析

通过异常堆栈报错的部分可以看出decode方法的message参数的类型少指定了package,期望应该是例如:com.example.UserProtoBuf.User的形式,但是生成的代码message参数的类型为“.UserProtoBuf.User”

public static RowData decode(.UserProtoBuf.User message){
    ...
}

user.proto文件如下所示,使用protoc编译为UserProtoBuf.java文件后通过IDEA打包成jar并添加到作业依赖中:

syntax = "proto3";
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

Flink SQL:

CREATE TEMPORARY TABLE test (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
  'protobuf.ignore-parse-errors' = 'true'
)
;

结合堆栈信息

可以定位到异常抛出的方法为:

org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass

image.png

code为org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter类的构造方法ProtoToRowConverter中如下代码调用并传入的,具体的参数值为“codegenAppender.code()”

Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());

codegenAppender对象以及生成的java代码中decode方法message参数的类型是由如下的代码生成的

PbCodegenAppender codegenAppender = new PbCodegenAppender();
codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");

继续追溯fullMessageClassName

// 通过我们传入的类名获取该类的Descriptor对象
Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
// 通过该Descriptor对象获取 路径+类名 作为decode方法中message的参数类型
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);

image.png

再继续看PbFormatUtils.getFullJavaName具体是怎么实现的,当前测试类非包装类,会走到else部分的代码块,getOuterProtoPrefix方法会传入自定义protobuf format的FileDescriptor

image.png

getOuterProtoPrefix方法的return部分的代码我们可以看到,在我们这个case中应该是javaPackageName为空导致message的类型为"."开头,没有包含前面的路径(在idea中打包到了org.example中,并在flink sql的参数中进行了指定)。究其原因,javaPackageName的取值在编译的protobuf 类中通过FileDescriptor获取java package或proto package的值(java package为空时)

image.png

但是当前proto文件中,如下两个参数均未指定,而是通过idea对生成的类进行了package的声明,导致无法找到对应的javaPackageName,我们通过local debug同样可以验证我们的推论。

package com.example.xxx.model;
option java_package = "com.example.xxx.model";

image.png

我们可以在protobuf的官网上找到两个参数的相关介绍:https://protobuf.dev/programming-guides/proto3/#options

可以看到java_package与package两个参数的关系如上图getOuterProtoPrefix方法中的逻辑一致,优先为非空的java_package,不然则去proto文件的package参数。

image.png

9.2.3. 问题解决

我们可以按照flink官网的样例,在proto文件中对package以及option java_package两个参数均进行声明,并与idea项目中java打包后的路径一致即可绕过这个问题,让compileClass方法可以通过FileDescriptor找到package名。该问题已经提出issue到社区:https://issues.apache.org/jira/browse/FLINK-35034

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
消息中间件 存储 传感器
333 0
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
283 11
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
632 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
533 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
418 1
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
838 0
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版