带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

简介: 带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (3) https://developer.aliyun.com/article/1228581


使用 data stream

 

此处对数据流的操作主要以命令为主,Kibana 界面支持较少。

 

新增数据

 

data stream 在新增数据时是只追加的模式,因此在固定 id 和 bulk 的模式下,op_type 是指定 create 的。

 

如下面命令:


POST my-data-stream/_create/1
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

或者

PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

如果并不指定,文档的 id,则可以使用默认的 _doc ,如下:

 

POST my-data-stream/_doc/
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

获取 data stream 状态

 

使用 data stream stats API 查看 data stream 的状态。

GET /_data_stream/my-data-stream/_stats?human=true

Response:

{
  "_shards" : {
"total" : 4,
  "successful" : 2,
    "failed" : 0
  },
  "data_stream_count" : 1,
  "backing_indices" : 1,
  "total_store_size" : "5kb",
  "total_store_size_bytes" : 5151,
  "data_streams" : [
    {
      "data_stream" : "my-data-stream",
      "backing_indices" : 1,
      "store_size" : "5kb",
      "store_size_bytes" : 5151,
      "maximum_timestamp" : 1607252645000
    }
  ]
}


可见 my-data-stream 的大下和后备索引数量。

 

同时需要用 _ilm/explain 获取 data stream 后备索引所在的 ILM 策略状态。

 

GET my-data-stream/_ilm/explain

 

结果:

 

{
 "indices" : {
    ".ds-my-data-stream-000001" : {
      "index" : ".ds-my-data-stream-000001",
      "managed" : true,
      "policy" : "my-data-stream-policy",
       "lifecycle_date_millis" : 1620907943375,
      "age" : "7.95s",
      "phase" : "hot",
      "phase_time_millis" : 1620907943567,
      "action" : "rollover",
      "action_time_millis" : 1620907943661,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1620907943661,
      "phase_execution" : {
        "policy" : "my-data-stream-policy",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "25gb"
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1620907939978
      }
    }
  }
}

上图可见,这个数据的 000001 索引主要处于 hot 阶段,策略名称是 logs 等信息。具体参数可见于 ILM 的相关定义。

 

手动 rollover data stream

 

使用 rollover API,手动 rollover data stream。


POST my-data-stream/_rollover

结果:

{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : ".ds-my-data-stream-000001",
  "new_index" : ".ds-my-data-stream-000002",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : { }
}

再 GET 相关 data stream 状态,后备索引增加。

GET /_data_stream/my-data-stream/

结果:

{
  "data_streams" : [
    {
      "name" : "my-data-stream",
      "timestamp_field" : {
        "name" : "@timestamp"
      },
      "indices" : [
        {
          "index_name" : ".ds-my-data-stream-000001",
          "index_uuid" : "AJBi0g3fRyG8-1tiH2UD2Q"
        },
        {
          "index_name" : ".ds-my-data-stream-000002",
          "index_uuid" : "AgOLGMSBSYWb4X-ID8uwtg"
        }
      ],
      "generation" : 2,
      "status" : "GREEN",
      "template" : "my-data-stream-template",
      "ilm_policy" : "my-data-stream-policy",
      "hidden" : false
    }
  ]
}


Reindex data stream

 

使用 reindex API 去复制数据到一个 data stream。由于 data stream 的只追加特性,在

op_type 中要选择为 create 。

POST /_reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

结果:

 

{
  "took" : 80,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 1,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

Delete/Update by query

 

针对 data stream 只能 delete/update by query。

 

相关命令:


POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
}
  },
  "script": {
 "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

Delete update 后备索引数据

 

在后备索引删除或者修改,需要注意下面三个要素:

 

l 文档 id。

l 文档所在的后备索引。

l 如果是修改文档,则需要其 _seq_no 和 _primary_term 两个参数。

 

主要操作如下:

 

先获取文档所需的要素信息,设置 seq_no_primary_term 为 true。

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "message": "Login attempt failed"
        }
  }
}

 获得结果:

{
  "took" : 621,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.8630463,
    "hits" : [
      {
        "_index" : ".ds-my-data-stream-000001",
        "_type" : "_doc",
        "_id" : "9ZakZXkBkhA9X9yUZo2P",
        "_seq_no" : 0,
        "_primary_term" : 1,
        "_score" : 0.8630463,
        "_source" : {
          "@timestamp" : "2020-12-06T11:04:05.000Z",
          "user" : {
            "id" : "vlb44hny"
          },
          "message" : "Login attempt failed"
             }
      }
    ]
  }
}

然后修改命令:

PUT /.ds-my-data-stream-000001/_doc/9ZakZXkBkhA9X9yUZo2P?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2020-12-07T11:06:07.000Z",
  "test": 4
}

结果:

{
  "_index" : ".ds-my-data-stream-000001",
  "_type" : "_doc",
  "_id" : "9ZakZXkBkhA9X9yUZo2P",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

或者删除命令:

DELETE  /.ds-logs-1-1-000002/_doc/3

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (5) https://developer.aliyun.com/article/1228578

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
JSON 监控 数据管理
【Elasticsearch专栏 12】深入探索:Elasticsearch使用索引生命周期管理(ILM)自动化删除旧数据
Elasticsearch的ILM功能允许用户定义策略,自动管理索引从创建到删除的生命周期。用户可以设置策略,根据索引年龄或大小自动删除旧数据,节省存储空间。通过应用ILM策略于索引模板,新索引将遵循预定义的生命周期。用户还可以监控ILM状态,确保策略按预期执行。使用ILM,用户可以高效地管理数据,确保旧数据及时删除,同时保持数据完整性和安全性。
932 3
|
SQL 人工智能 自然语言处理
Copilot的优势
【2月更文挑战第13天】Copilot的优势
657 5
Copilot的优势
LocalDateTime的全局自定义序列化
LocalDateTime的全局自定义序列化
|
JavaScript
JS中every的简单使用
JS中every的简单使用
|
Java Shell 应用服务中间件
超详细总结docker镜像
超详细总结docker镜像
461 0
|
5月前
|
监控 测试技术 API
人为漏测防不住?让Dify工作流成为你的“测试策略大脑”,7x24小时在线排查
在软件测试中,人为疏漏难以避免。本文介绍如何用Dify工作流构建“测试策略大脑”,将专家经验固化为自动化分析系统,实现代码变更智能评估、测试重点推荐,7x24小时守护质量,让测试更精准高效。
|
Web App开发 JavaScript 前端开发
JavaScript中的性能优化:代码优化技巧与性能分析工具
【4月更文挑战第22天】本文探讨JavaScript性能优化,包括代码优化技巧和性能分析工具。建议避免全局查找、减少DOM操作、使用事件委托、优化循环和异步编程以提升代码效率。推荐使用Chrome DevTools、Lighthouse和jsPerf等工具进行性能检测和优化。持续学习和实践是提升JavaScript应用性能的关键。
|
4月前
|
存储 缓存 Java
SpringBoot自动装配机制
SpringBoot通过@SpringBootApplication实现自动装配,其核心为@AutoConfigurationPackage与@AutoConfigurationImportSelector。前者注册主包路径,后者加载spring.factories中配置的自动配置类,结合@ComponentScan与过滤机制,实现Bean的自动扫描、去重与注入,简化开发配置。
335 1
C语言实现2048小游戏---粤嵌GE6818嵌入式系统实训
C语言实现2048小游戏---粤嵌GE6818嵌入式系统实训
|
机器学习/深度学习 编解码 算法
在线打开CAD或Solidworks的STP文件,通过以图搜图与实物比对搜索
智能比对系统利用大模型技术,实现设计图纸与实物的高效、精准比对。系统支持在线3D模型解析、多视图图片自动生成、实物照片智能比对及实时偏差标注,全面提升机械制造行业的设计、生产和质量控制效率。
888 2

热门文章

最新文章