开发者社区> 问答> 正文

flink 1.11.2 cep rocksdb 性能调优

我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。

private void bufferEvent(IN event, long currentTime) throws Exception {

long currentTs = System.currentTimeMillis();

List elementsForTimestamp = elementQueueState.get(currentTime);

if (elementsForTimestamp == null) {

this.bufferEventGetNullhistogram.update(System.currentTimeMillis()

  • currentTs);

elementsForTimestamp = new ArrayList<>();

}else {

this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);

}

elementsForTimestamp.add(event);

long secondCurrentTs = System.currentTimeMillis();

elementQueueState.put(currentTime, elementsForTimestamp);

this.bufferEventPuthistogram.update(System.currentTimeMillis() -

secondCurrentTs);

this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);

}

通过复写CepOperator,加入了一些metics发现

this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventGetNullhistogram =

metrics.histogram("buffer_event_get_null_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new

DescriptiveStatisticsHistogram(1000));

在get和put比较耗时,整个bufferEvent 能达到200ms

从rocksdb的metric来看没有进行太多flush和compaction。

[image: image.png]

[image: image.png]

也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。

也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html

,但是我这sst文件很小。

请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 11:19:35 1484 0
1 条回答
写回答
取消 提交回答
  • ‘@Override

    public UV get(UK userKey) throws IOException, RocksDBException {

    byte[] rawKeyBytes =

    serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,

    userKeySerializer);

    byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

    return (rawValueBytes == null ? null :

    deserializeUserValue(dataInputView, rawValueBytes,

    userValueSerializer));

    }

    @Override

    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

    byte[] rawKeyBytes =

    serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,

    userKeySerializer);

    byte[] rawValueBytes = serializeValueNullSensitive(userValue,

    userValueSerializer);

    backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);

    }

    通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。*来自志愿者整理的flink邮件归档

    2021-12-07 11:22:33
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载