问题一:flink-1.11 ddl 写入json 格式数据到hdfs问题
代码引用
将parquet换成了json之后,chk成功,但是文件状态一直处于in-progress状态,我应该如何让它成功呢?
parquet目前是已经success了。
*来自志愿者整理的flink邮件归档
参考答案:
如同[1]里面说的,对于csv和json,你还需要配置rolling相关参数,因为它们是可以不在checkpoint强行rolling的。
NOTE: For row formats (csv, json), you can set the parameter
sink.rolling-policy.file-size or sink.rolling-policy.rollover-interval in
the connector properties and parameter execution.checkpointing.interval in
flink-conf.yaml together if you don’t want to wait a long period before
observe the data exists in file system. For other formats (avro, orc), you
can just set parameter execution.checkpointing.interval in flink-conf.yaml.
所以如果你想通过时间来rolling,你还需要配sink.rolling-policy.rollover-interval和sink.rolling-policy.check-interval
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#rolling-policy*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370217?spm=a2c6h.12873639.article-detail.68.6f9243783Lv0fl
问题二:flink 1.11 checkpoint使用
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml);
*来自志愿者整理的flink邮件归档
参考答案:
为什么要 GROUP BY id,name ,description, weight ? 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source" 不能满足需求?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370218?spm=a2c6h.12873639.article-detail.69.6f9243783Lv0fl
问题三:flink 1.11任务提交的问题
请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql dml提交(executeSQL执行),又通过DataStream.addSink来写出, 通过StreamExecutionEnvironment.execute提交,yarn per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
*来自志愿者整理的flink邮件归档
参考答案:
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370220?spm=a2c6h.12873639.article-detail.70.6f9243783Lv0fl
问题四:flink1.9写权限认证的es6
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下。。。。
*来自志愿者整理的flink邮件归档
参考答案:
SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能
[1] https://issues.apache.org/jira/browse/FLINK-18361*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370221?spm=a2c6h.12873639.article-detail.71.6f9243783Lv0fl
问题五:flink state问题
大家好
我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {
var state:ValueState[BloomFilter[CharSequence]]= null
override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))
state = getRuntimeContext.getState(stateDesc)
}
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {
var filter = state.value
if(filter==null){
println("null filter")
filter= BloomFilter.createCharSequence}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
filter.put(value._2)
state.update(filter)
out.collect(value._2)
}
}
})
通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊
*来自志愿者整理的flink邮件归档
参考答案:
你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370223?spm=a2c6h.12873639.article-detail.72.6f9243783Lv0fl