问题一:关于filesystem connector的一点疑问
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 现在有这样的场景: 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? 有大佬知道吗,有实际验证过吗 感谢
附上简单sql: CREATE TABLE kafka ( a STRING, b STRING, c BIGINT, process_time BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'x', 'properties.group.id' = 'test-1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.flink.partition-discovery.interval-millis' = '300000' );
CREATE TABLE filesystem ( day STRING, hour STRING, a STRING, b STRING, c BIGINT, d BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) PARTITIONED BY (day, hour) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'hdfs://xx', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file' );
insert into filesystem select from_unixtime(process_time,'yyyy-MM-dd') as day, from_unixtime(process_time,'HH') as hour, a, b, c, d, e, f, g, h, i from kafka;
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger*来自志愿者整理的flink邮件归档
参考答案:
按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。
https://cloud.tencent.com/developer/article/1707182
这个连接可以看一下 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364541?spm=a2c6h.13066369.question.21.6ad263827fpfa7
问题二:flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + " ) WITH ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'mytesttopic', " + " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + " 'connector.properties.group.id' = 'flink-test-cxy', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + " ) "); tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'connector.username' = 'root' , " + " 'connector.password' = 'root', " + " 'connector.table' = 'mysqlsink' , " + " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'connector.write.flush.interval' = '2s', " + " 'connector.write.flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("insert into mysqlsink (id,game_id) values (select a,cast(b as varchar) b from mySource)");
问题一 : 上面的insert语句会出现如下错误 Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'*来自志愿者整理的flink邮件归档
参考答案:
Hi,
看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, 需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371059?spm=a2c6h.13066369.question.22.6ad26382Kh6myX
问题三:Flink sql 转义字符问题
SPLIT_INDEX(${xxx}, ';', 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~*来自志愿者整理的flink邮件归档
参考答案:
SPLIT_INDEX(${xxx}, ‘;’, 0)
‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。 U&'\003B' 是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用, 比如 \n 对应的s是 U&'\000A’ ,\r 对应的是 U&'\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371063?spm=a2c6h.13066369.question.25.6ad26382Ri4TKS
问题四:flink kafka SQL Connectors 传递kerberos 参数
flink v1.11.1 kafka使用了kerberos 下面DDL 是支持 kerberos 参数
CREATETABLEkafkaTable( ... )WITH('connector'='kafka', 'topic'='user_behavior', 'properties.bootstrap.servers'='localhost:9092', 'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT', 'sasl.mechanism'='GSSAPI', 'sasl.kerberos.service.name'='kafka', 'format'='csv', 'scan.startup.mode'='earliest-offset' )
是否支持上面的参数?*来自志愿者整理的flink邮件归档
参考答案:
不知道你的问题是能否通过这个解决
我看了下目前文档里缺少了传递kafka properties 的部分,我建了个issue[1]把文档补齐
Best Leonard [1] https://issues.apache.org/jira/browse/FLINK-18768 https://issues.apache.org/jira/browse/FLINK-18768*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371068?spm=a2c6h.13066369.question.26.6ad26382omnRLj
问题五:Flink使用Kafka作为source时checkpoint成功提交offset的机制
大家好,请教一个问题, 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗? 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
另外有大神指路这段代码具体在哪个文件吗? 谢谢!
-- Best Wishes, Shuwen Zhou*来自志愿者整理的flink邮件归档
参考答案:
似乎楼主一开始说的checkpoint成功是指source 算子的checkpoint成功?但notifyCheckpointComplete函数要求的是整个链路的chk成功。 这个时候offset为100的消息必然已经被sink算子处理完成了,因为触发chk的屏障消息必然在offset100的消息之后到达sink算子。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371486?spm=a2c6h.13066369.question.25.6ad26382XYUvkP