开发者社区> 问答> 正文

Flink with Elasticsearch 6.0.0 Sink没有这种方法错误批量处理。

用elasticsearch(版本是6.0.0)接收器构建一个flink流式字数统计演示。得到了跟随错误。这似乎是依赖性的冲突。

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;

at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.quvideo.xiaoying.flink.elasticsearch.WordCountSinkElasticsearch.main(WordCountSinkElasticsearch.java:68)

Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;

at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:92)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:45)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:353)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:297)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1
我的elasticsearch集群是6.0.0,flink依赖项如下

<flink.version>1.6.0</flink.version>
<elastic>6.0.0</elastic>


    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

elasticsearch相关的依赖关系如下:

    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>${elastic}</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>${elastic}</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elastic}</version>
</dependency>

以及与弹性相关的代码:

public static ElasticsearchSink<WordWithCount> getEsSink(){
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"));
    ElasticsearchSink.Builder<WordWithCount> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            new ElasticsearchSinkFunction<WordWithCount>() {
                public IndexRequest createIndexRequest(WordWithCount element) {
                    Map<String, Object> json = new HashMap<>();
                    json.put("word", element.word);
                    json.put("count", element.count);

                    return Requests.indexRequest()
                            .index("wordcount_idx")
                            .type("test_type")
                            .source(json);
                }

                @Override
                public void process(WordWithCount element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            }
    );
    esSinkBuilder.setBulkFlushMaxActions(1);
    return esSinkBuilder.build();
}

运行代码段展开代码段
详细信息,此错误在Elasticsearch6ApiCallBridge.java的方法中触发

@覆盖

 public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient客户端,BulkProcessor.Listener侦听器){
    返回BulkProcessor.builder(client :: bulkAsync,listener); //错误消息“客户端不是功能接口”。
 }

展开
收起
社区小助手 2018-12-11 16:29:54 6889 0
2 条回答
写回答
取消 提交回答
  • 问题解决了没

    2022-07-08 19:50:33
    赞同 展开评论 打赏
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    我提交了https://issues.apache.org/jira/browse/FLINK-10173。作为一种解决方法,您只需添加:


    org.elasticsearch.client
    elasticsearch-rest-high-level-client
    6.3.1

    2019-07-17 23:19:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
阿里云Elasticsearch体系架构与特性解析 立即下载
开源与云:Elasticsearch应用剖析 立即下载
《Elasticsearch全观测解决方案》 立即下载