开发者社区> 问答> 正文

使用阿里云es transportclient None of the configured nodes are available

访问阿里云es建库或者服务启动总会报None of the configured nodes are available,历史服务采用transportclient,官方不推荐,但是老服务改造成本太大.代码如下
@Bean

public Client client() {
    Settings esSettings = Settings.builder()
            .put("cluster.name", EsClusterName)
            .put("xpack.security.user", esUsername + ":" + esPassword)
            .build();

    TransportClient tclient = new PreBuiltXPackTransportClient(esSettings);
    try {
        String[] nodes = EsHosts.split(",");
        for (String node : nodes) {
            node = node.trim();
            if (node.length() > 0) {//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node)
                String[] hostPort = node.split(":");
                int port = 9300;
                if (hostPort.length == 2) {
                    port = Integer.parseInt(hostPort[1]);
                }
                tclient.addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName(hostPort[0]), port));
            }
        }
    }catch (UnknownHostException e) {
        log.error(e.getMessage(),e);
    }
    return tclient;
}

@Bean
public BulkProcessor getBulkProcessor() {
    BulkProcessor bulkProcessor = null;
    bulkProcessor = BulkProcessor.builder(client(),
            new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    log.info( "提交" + response.getItems().length + "个文档,用时"
                            + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败!" : ""));
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    //提交结束且失败时调用
                    log.error( " 有文档提交失败!after failure=" + failure.getCause() == null ? failure.getMessage() : failure.getCause().toString());
                }
            })

            .setBulkActions(bulkActions)
            .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
            .setFlushInterval(TimeValue.timeValueSeconds(30))
            .setConcurrentRequests(1)
            .build();

    return bulkProcessor;
}

展开
收起
feihaodong 2018-09-10 19:13:32 5425 0
4 条回答
写回答
取消 提交回答
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Higher-Level APIs in TensorFlo 立即下载
How to overcome mysterious problems caused by large and multi-tenant hadoop cluster at Rakuten 立即下载
Scaling Spark applications by connecting code to resource consumption 立即下载