问题一:集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
参考回答:
FixedDelaStrategy 默认是从最近一个ck 恢复,其他的策略可以看官网。如果你是想问怎么实现的,不建议在邮件列表里问实现原理的问题。可以google找相关文章、相关flip 或者 直接debug源码。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359846
问题二:关于savepoint恢复问题咨询?
hi, 社区 版本flink 1.7 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。 我使用rocksdb和启用检查点。 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。 我不知道为什么有些数据似乎会丢失? 日志显示``No restore state for FlinkKafkaConsumer'' 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd Street, GaoXin District, Chengdu, Sichuan Province Mobile +86 15817382279 Email wangchunhao@navercorp.com
NCloud
-----Original Message----- From: "王春浩"wa...@navercorp.com To: us...@flink.apache.org; Cc: Sent: 2021/5/26周三 17:03 (GMT+08:00) Subject: inquire about restore from savepoint
Hi Community, version flink 1.7 im trying to make a flink job restore from a savepoint(or checkpoint), what the job do is reading from kafka -> do a 30-minutes-window aggregation(just AggregationFunction, acts like a counter) -> sink to kafka. i use rocksdb and enabled checkpoint. now i try to trigger a savepoint manually. the expected value of each aggregated one is 30(1 data/per minute). but when i restore from a savepoint(flink run -d -s {savepoint's url}), the aggregated value is not 30(less than 30, depends on the time i cancel flink job and restore). but when the job run normally, it gets 30. i don't know why could some data seems to be lost? and a log shows "No restore state for FlinkKafkaConsumer"*来自志愿者整理的flink邮件归档
参考回答:
看下你的 flink 命令对不对,然后去 Flink Web UI Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个
restore path).
之后再看下你的窗口时间类型用的是什么。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359849
问题三:有哪些flink状态查看工具可以使用?
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。 查看checkpoint页显示状态有17MB,checkpoint耗时要2s。 想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?*来自志愿者整理的flink邮件归档
参考回答:
可以使用 State Processor [1]。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359851
问题四:flink sql写mysql中文有乱码问题怎么办?
我的flink sql作业如下
SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM(
mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
运行起来后写入mysql表的数据带有中文乱码 ??????
查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS f3,COUNTRETRACT(orderno)ASf3,COUNTRETRACT(orderno)ASf3, COUNT_RETRACT(order_no) AS f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST(f3)AStransamt,CAST(f3)AStransamt,CAST(f3) AS trans_amt, CAST(f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.*来自志愿者整理的flink邮件归档
参考回答:
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样
CREATE TABLE jdbc_sink(
id INT COMMENT '订单id',
goods_name VARCHAR(128) COMMENT '商品名称',
price DECIMAL(32,2) COMMENT '商品价格',
user_name VARCHAR(64) COMMENT '用户名称'
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',
'username' = 'mysqluser',
'password' = 'mysqluser',
'table-name' = 'jdbc_sink'
)
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359853
问题五:flink sql支持Common Table Expression (CTE)吗?
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as ( SELECT AVG(salary) as avgSalary FROM salaries WHERE gender = "Female" ) SELECT name , salary FROM People WHERE name in (SELECT DISTINCT FROM toronto_ppl) AND salary >= (SELECT avgSalary FROM avg_female_salary)*来自志愿者整理的flink邮件归档
参考回答:
支持。
如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。
在某些优化路径上不同,一般没有实质影响。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359854