Elasticsearch java api(五) Bulk批量索引

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 这篇博客介绍一下Elasticsearch对多个文档进行索引的简便方法。Bulk api的支持可以实现一次请求执行批量的添加、删除、更新等操作.

这篇博客介绍一下Elasticsearch对多个文档进行索引的简便方法。Bulk api的支持可以实现一次请求执行批量的添加、删除、更新等操作.Bulk操作使用的是UDP协议,UDP无法确保与ElasticSearch服务器通信时不丢失数据.

一、Bulk API

使用bulk命令时,REST API以_bulk结尾,批量操作写在json文件中,官网给出的语法格式:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

也就是说每一个操作都有2行数据组成,末尾要回车换行。第一行用来说明操作命令和原数据、第二行是自定义的选项.举个例子,同时执行插入2条数据、删除一条数据, 新建bulkdata.json,写入如下内容:

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}
{ "title":"title1","posttime":"2016-07-02","content":"内容一" }

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}
{ "title":"title2","posttime":"2016-07-03","content":"内容2" }

{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}

执行:

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
  "took" : 11,
  "errors" : false,
  "items" : [ {
    "create" : {
      "_index" : "blog",
      "_type" : "article",
      "_id" : "13",
      "_version" : 1,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "failed" : 0
      },
      "status" : 201
    }
  } ]
}

注意:行末要回车换行,不然会因为命令不能识别而出现错误.

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json 
{
  "error" : {
    "root_cause" : [ {
      "type" : "action_request_validation_exception",
      "reason" : "Validation Failed: 1: no requests added;"
    } ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: no requests added;"
  },
  "status" : 400
}

二、批量导出

下面的例子是把索引库中的文档以json格式批量导出到文件中,其中集群名称为”bropen”,索引库名为”blog”,type为”article”,项目根目录下新建files/bulk.txt,索引内容写入bulk.txt中:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;

public class ElasticSearchBulkOut {

    public static void main(String[] args) {

        try {
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            QueryBuilder qb = QueryBuilders.matchAllQuery();
            SearchResponse response = client.prepareSearch("blog")
                    .setTypes("article").setQuery(QueryBuilders.matchAllQuery())
                    .execute().actionGet();
            SearchHits resultHits = response.getHits();

            File article = new File("files/bulk.txt");
            FileWriter fw = new FileWriter(article);
            BufferedWriter bfw = new BufferedWriter(fw);

            if (resultHits.getHits().length == 0) {
                System.out.println("查到0条数据!");

            } else {
                for (int i = 0; i < resultHits.getHits().length; i++) {
                    String jsonStr = resultHits.getHits()[i]
                            .getSourceAsString();
                    System.out.println(jsonStr);
                    bfw.write(jsonStr);
                    bfw.write("\n");
                }
            }
            bfw.close();
            fw.close();

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

这里写图片描述

三、批量导入

从刚才导出的bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

    public static void main(String[] args) {

        try {

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            File article = new File("files/bulk.txt");
            FileReader fr=new FileReader(article);
            BufferedReader bfr=new BufferedReader(fr);
            String line=null;
            BulkRequestBuilder bulkRequest=client.prepareBulk();
            int count=0;
            while((line=bfr.readLine())!=null){
                bulkRequest.add(client.prepareIndex("test","article").setSource(line));
                if (count%10==0) {
                    bulkRequest.execute().actionGet();
                }
                count++;
                //System.out.println(line);
            }
            bulkRequest.execute().actionGet();

            bfr.close();
            fr.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

参考文档:

  1. Elasticsearch Reference [2.3] » Document APIs » Bulk API
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
11天前
|
存储 人工智能 自然语言处理
Elasticsearch Inference API增加对阿里云AI的支持
本文将介绍如何在 Elasticsearch 中设置和使用阿里云的文本生成、重排序、稀疏向量和稠密向量服务,提升搜索相关性。
54 14
Elasticsearch Inference API增加对阿里云AI的支持
|
7天前
|
Java API 数据处理
探索Java中的Lambda表达式与Stream API
【10月更文挑战第22天】 在Java编程中,Lambda表达式和Stream API是两个强大的功能,它们极大地简化了代码的编写和提高了开发效率。本文将深入探讨这两个概念的基本用法、优势以及在实际项目中的应用案例,帮助读者更好地理解和运用这些现代Java特性。
|
13天前
|
Java 大数据 API
别死脑筋,赶紧学起来!Java之Steam() API 常用方法使用,让开发简单起来!
分享Java Stream API的常用方法,让开发更简单。涵盖filter、map、sorted等操作,提高代码效率与可读性。关注公众号,了解更多技术内容。
|
26天前
|
分布式计算 Java 大数据
大数据-147 Apache Kudu 常用 Java API 增删改查
大数据-147 Apache Kudu 常用 Java API 增删改查
25 1
|
21天前
|
SQL Java API
深入探索Java的持久化技术——JPA(Java Persistence API)
【10月更文挑战第10天】深入探索Java的持久化技术——JPA(Java Persistence API)
15 0
|
21天前
|
Java API 数据库
深入探索Java的持久化技术——JPA(Java Persistence API)
【10月更文挑战第10天】深入探索Java的持久化技术——JPA(Java Persistence API)
27 0
|
24天前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
85 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
3月前
|
数据可视化 Docker 容器
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
|
3月前
|
JSON 自然语言处理 数据库
Elasticsearch从入门到项目部署 安装 分词器 索引库操作
这篇文章详细介绍了Elasticsearch的基本概念、倒排索引原理、安装部署、IK分词器的使用,以及如何在Elasticsearch中进行索引库的CRUD操作,旨在帮助读者从入门到项目部署全面掌握Elasticsearch的使用。