问题一:flink 1.11.2 cep rocksdb 性能调优
我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。
private void bufferEvent(IN event, long currentTime) throws Exception {
long currentTs = System.currentTimeMillis();
List elementsForTimestamp = elementQueueState.get(currentTime);
if (elementsForTimestamp == null) {
this.bufferEventGetNullhistogram.update(System.currentTimeMillis()
- currentTs);
elementsForTimestamp = new ArrayList<>();
}else {
this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);
}
elementsForTimestamp.add(event);
long secondCurrentTs = System.currentTimeMillis();
elementQueueState.put(currentTime, elementsForTimestamp);
this.bufferEventPuthistogram.update(System.currentTimeMillis() -
secondCurrentTs);
this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);
}
通过复写CepOperator,加入了一些metics发现
this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventGetNullhistogram =
metrics.histogram("buffer_event_get_null_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new
DescriptiveStatisticsHistogram(1000));
在get和put比较耗时,整个bufferEvent 能达到200ms
从rocksdb的metric来看没有进行太多flush和compaction。
[image: image.png]
[image: image.png]
也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。
也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html
,但是我这sst文件很小。
请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。*来自志愿者整理的flink邮件归档
参考答案:
‘@Override
public UV get(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null ? null :
deserializeUserValue(dataInputView, rawValueBytes,
userValueSerializer));
}
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
byte[] rawValueBytes = serializeValueNullSensitive(userValue,
userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371646?spm=a2c6h.13066369.question.84.6ad263826CoWoh
问题二:cdc代码报错
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
源码:
public class CdcTest {
public static void main(String[] args) throws Exception {
SourceFunction sourceFunction = MySQLSource. builder()
.hostname("localhost")
.port(3306)
.databaseList("sohay") // monitor all tables under inventory database
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}*来自志愿者整理的flink邮件归档
参考答案:
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371645?spm=a2c6h.13066369.question.83.6ad26382ziEiEA
问题三:使用flink-CDC checkpoint超时问题
我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。
前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。
每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问
数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?*来自志愿者整理的flink邮件归档
参考答案:
我建议可以从两方面排查一下:
1、检查 checkpoint 的大小,是不是很大?
2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371644?spm=a2c6h.13066369.question.86.6ad263822mbZYT
问题四:flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types
本人使用flink版本为1.11.0,自定义udaf如下:
public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
@Override
@DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")
public Row getValue(ContinuousListenDuration acc) {
return Row.of(acc.getStartTime(), acc.getDuration());
}
@Override
public ContinuousListenDuration createAccumulator() {
return new ContinuousListenDuration();
}
public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
// 此处省略逻辑
}
}
聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:
insert into
report.result
select
id,
city_code,
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).startTime as start_time,
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).duration as duration
from
(
select
o.id,
o.dt,
o.order_no,
r.city_code
from
(
select
req [1] as id,
dt,
proctime,
req [2] as order_no
from
tmp_v
where
extra [1] is null
or extra [1] <> 'false'
) o
JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
) a
group by
id,
city_code
having
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).duration >= 2
运行时发生如下异常:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]
at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
我的问题是这样定义udf有什么问题吗?*来自志愿者整理的flink邮件归档
参考答案:
从这行报错堆栈来看:at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101)
, 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371643?spm=a2c6h.13066369.question.85.6ad26382Qobcby
问题五:flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds
我的flink程序正常执行,但是我在web ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?*来自志愿者整理的flink邮件归档
参考答案:
你任务的DAG是什么样子的呢,可能的原因:
1.source本来就没有收到数据,或者没有发送到下游
2.source和下游算子chain在一起看不出来*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371642?spm=a2c6h.13066369.question.88.6ad26382p86CnM