Spark搭档Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Spark与elasticsearch结合使用是一种常用的场景,小编在这里整理了一些Spark与ES结合使用的方法。

Spark与elasticsearch结合使用是一种常用的场景,小编在这里整理了一些Spark与ES结合使用的方法。
一、 write data to elasticsearch
利用elasticsearch Hadoop可以将任何的RDD保存到Elasticsearch,不过有个前提其内容可以翻译成文件。这意味着RDD需要一个Map/JavaBean/Scala case class
Scala
在Scala中只需要以下几步:

  1. Spark Scala imports
  2. Elasticsearch-hadoop Scala imports
  3. Start Spark through its Scala API
  4. makeRDD
  5. index content(内容索引) index ES under spark/docs
    下面是一个例子:

screenshot
Scala用户可能会使用SEQ和→符号声明根对象(即JSON文件)而不是使用Map。而类似的,第一个符号的结果略有不同,不能相匹配的一个JSON文件:序列是一阶序列(换句话说,一个列表),←会创建一个Tuple(元组),或多或少是一个有序的,元素的定数。例如,一个列表的列表不能作为一个文件,因为它不能被映射到一个JSON对象;但是它可以在一个自由的使用。因此在上面的例子Map(K→V)代替SEQ(K→V)
作为一种替代上面的隐式导入,elasticsearch-hadoop支持spark的Scala用户通过org.elasticsearch.spark.rdd包作为实用类允许显式方法调用EsSpark。此外,而不是地图(这是方便,但需要一个映射,每个实例,由于它们的结构的差异),使用一个case class:

  1. EsSpark importrs
  2. 定义一个Case class名叫Trip
  3. 利用Trip实例创建一个RDD
  4. 明确RDD的index通过EsSpark

例子:
screenshot

对于指定documents的id(或者其他类似于TTL或时间戳的元数据),可以设置名字为es.mapping.id的映射。下面以前的实例,Elasticsearch利用filed的id作为documents的id.更新RDD的配置configuration(也可以在SparkConf上设置全局的属性,不建议这样做)
screenshot
Writing existing to Elasticsearch
如果Rdd的数据已经在Json中,elasticsearch-hadoop允许直接索引而不需要任何转换,数据直接发送到Elasticsearch.这时候elasticsearch-hadoop希望RDD包含字符或者字节数组(string[]/byte[]),假设每个条目代表一个JSON文档。如果RDD没有正确的签名,这savejsontoes方法无法应用(在Scala中他们将不可用)。

screenshot
Writing to dynamic/multi-resources
当被写入ES的数据需要索引不同的buckets,可以利用es.resource.write,下面media的例子配置如下:
screenshot

  1. 用于拆分数据的文档。任何字段都可以被声明(但要确保它在所有的文件中都是可用的)
  2. 保存每个对象根据其资源的模式,在这个例子的基础上media_type
    每个文档或者对象被写入,Elasticsearch Hadoop将提取media_type字段,使用它的值来确定目标资源。

Handling document metadata
Elasticsearch允许每个文档有自己的元数据(metadata),正如上面所解释的,通过各种映射选项可以自定义这些参数,以便他们的值是从他们的归属文档中提取。我们甚至可以包括/排除哪些部分数据被备份到Elasticsearch,在Spark中,Elasticsearch Hadoop扩展此功能允许将元数据提供的外部文档本身给pair RDDS用。另一方面,对于包含key-value元组的RDDS,metadata可以从作为文档源的key-value中提取。
screenshot
当有更多的Id需要被指定时,可以使用scala.collection.Map来接收 org.elasticsearch.spark.rdd.Metadata的key的类型:
screenshot
当有更多的Id需要被指定时,可以使用ava.util.Map来接收 org.elasticsearch.spark.rdd.Metadata的key的类型:
screenshot
二、 Reading data from elasticsearch
读数据需要定义一个EsRDD,将数据流从ES读到Spark
screenshot
screenshot
该方法可以被重载来指定一个额外的查询或配置图(overriding sparkconf):
screenshot
从Elasticsearch返回的文件,默认情况下,作为一个tuple2,包含第一个元素是文档ID和第二个元素实际文件通过Scala集合来表示,名字类似于Map[Sting,Any],其中key是字段名称和value是各自的值。

elasticsearch-hadoop自动转换spark内置类型作为Elasticsearch类型,如下表:
screenshot
SaprkSQL on support
直接看下面的例子:
screenshot
screenshot

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
8月前
|
分布式计算 API Apache
Spark与Elasticsearch的集成与全文搜索
Spark与Elasticsearch的集成与全文搜索
|
消息中间件 分布式计算 Kafka
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
367 0
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
|
分布式计算 搜索推荐 Spark
【Spark Summit East 2017】使用Spark和Elasticsearch构建数据集搜索引擎
本讲义出自Oscar Castaneda Villagran在Spark Summit East 2017上的演讲,主要介绍了利用内置了Elasticsearch的Spark集群使得在集群中的驱动节点上运行嵌入式Elasticsearch实例成为了可能,这就为开发更为先进的应用程序奠定了基础,其中一个应用就是数据集搜索。
3882 0
|
分布式计算 Spark MaxCompute
【Spark Summit EU 2016】在Spark集群中内置Elasticsearch
本讲义出自Oscar Castaneda在Spark Summit EU上的演讲,在使用ES-Hadoop进行开发的过程中,使Elasticsearch运行在Spark集群外部是一件非常繁琐的事情,为了在开发过程中更好地Elasticsearch实例,并且尽可能地降低开发团队之间的依赖关系,使用ES快照作为团队合作的接口,并且提高QA的效率,所提提出了在Spark集群中内置Elasticsearch的方式。
3054 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
204 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
86 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
60 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
118 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
128 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
156 2