带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

简介: 带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (3) https://developer.aliyun.com/article/1228581


使用 data stream

 

此处对数据流的操作主要以命令为主,Kibana 界面支持较少。

 

新增数据

 

data stream 在新增数据时是只追加的模式,因此在固定 id 和 bulk 的模式下,op_type 是指定 create 的。

 

如下面命令:


POST my-data-stream/_create/1
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

或者

PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

如果并不指定,文档的 id,则可以使用默认的 _doc ,如下:

 

POST my-data-stream/_doc/
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

获取 data stream 状态

 

使用 data stream stats API 查看 data stream 的状态。

GET /_data_stream/my-data-stream/_stats?human=true

Response:

{
  "_shards" : {
"total" : 4,
  "successful" : 2,
    "failed" : 0
  },
  "data_stream_count" : 1,
  "backing_indices" : 1,
  "total_store_size" : "5kb",
  "total_store_size_bytes" : 5151,
  "data_streams" : [
    {
      "data_stream" : "my-data-stream",
      "backing_indices" : 1,
      "store_size" : "5kb",
      "store_size_bytes" : 5151,
      "maximum_timestamp" : 1607252645000
    }
  ]
}


可见 my-data-stream 的大下和后备索引数量。

 

同时需要用 _ilm/explain 获取 data stream 后备索引所在的 ILM 策略状态。

 

GET my-data-stream/_ilm/explain

 

结果:

 

{
 "indices" : {
    ".ds-my-data-stream-000001" : {
      "index" : ".ds-my-data-stream-000001",
      "managed" : true,
      "policy" : "my-data-stream-policy",
       "lifecycle_date_millis" : 1620907943375,
      "age" : "7.95s",
      "phase" : "hot",
      "phase_time_millis" : 1620907943567,
      "action" : "rollover",
      "action_time_millis" : 1620907943661,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1620907943661,
      "phase_execution" : {
        "policy" : "my-data-stream-policy",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "25gb"
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1620907939978
      }
    }
  }
}

上图可见,这个数据的 000001 索引主要处于 hot 阶段,策略名称是 logs 等信息。具体参数可见于 ILM 的相关定义。

 

手动 rollover data stream

 

使用 rollover API,手动 rollover data stream。


POST my-data-stream/_rollover

结果:

{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : ".ds-my-data-stream-000001",
  "new_index" : ".ds-my-data-stream-000002",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : { }
}

再 GET 相关 data stream 状态,后备索引增加。

GET /_data_stream/my-data-stream/

结果:

{
  "data_streams" : [
    {
      "name" : "my-data-stream",
      "timestamp_field" : {
        "name" : "@timestamp"
      },
      "indices" : [
        {
          "index_name" : ".ds-my-data-stream-000001",
          "index_uuid" : "AJBi0g3fRyG8-1tiH2UD2Q"
        },
        {
          "index_name" : ".ds-my-data-stream-000002",
          "index_uuid" : "AgOLGMSBSYWb4X-ID8uwtg"
        }
      ],
      "generation" : 2,
      "status" : "GREEN",
      "template" : "my-data-stream-template",
      "ilm_policy" : "my-data-stream-policy",
      "hidden" : false
    }
  ]
}


Reindex data stream

 

使用 reindex API 去复制数据到一个 data stream。由于 data stream 的只追加特性,在

op_type 中要选择为 create 。

POST /_reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

结果:

 

{
  "took" : 80,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 1,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

Delete/Update by query

 

针对 data stream 只能 delete/update by query。

 

相关命令:


POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
}
  },
  "script": {
 "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

Delete update 后备索引数据

 

在后备索引删除或者修改,需要注意下面三个要素:

 

l 文档 id。

l 文档所在的后备索引。

l 如果是修改文档,则需要其 _seq_no 和 _primary_term 两个参数。

 

主要操作如下:

 

先获取文档所需的要素信息,设置 seq_no_primary_term 为 true。

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "message": "Login attempt failed"
        }
  }
}

 获得结果:

{
  "took" : 621,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.8630463,
    "hits" : [
      {
        "_index" : ".ds-my-data-stream-000001",
        "_type" : "_doc",
        "_id" : "9ZakZXkBkhA9X9yUZo2P",
        "_seq_no" : 0,
        "_primary_term" : 1,
        "_score" : 0.8630463,
        "_source" : {
          "@timestamp" : "2020-12-06T11:04:05.000Z",
          "user" : {
            "id" : "vlb44hny"
          },
          "message" : "Login attempt failed"
             }
      }
    ]
  }
}

然后修改命令:

PUT /.ds-my-data-stream-000001/_doc/9ZakZXkBkhA9X9yUZo2P?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2020-12-07T11:06:07.000Z",
  "test": 4
}

结果:

{
  "_index" : ".ds-my-data-stream-000001",
  "_type" : "_doc",
  "_id" : "9ZakZXkBkhA9X9yUZo2P",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

或者删除命令:

DELETE  /.ds-logs-1-1-000002/_doc/3

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (5) https://developer.aliyun.com/article/1228578

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
SQL 人工智能 自然语言处理
Copilot的优势
【2月更文挑战第13天】Copilot的优势
581 5
Copilot的优势
|
JSON 监控 数据管理
【Elasticsearch专栏 12】深入探索:Elasticsearch使用索引生命周期管理(ILM)自动化删除旧数据
Elasticsearch的ILM功能允许用户定义策略,自动管理索引从创建到删除的生命周期。用户可以设置策略,根据索引年龄或大小自动删除旧数据,节省存储空间。通过应用ILM策略于索引模板,新索引将遵循预定义的生命周期。用户还可以监控ILM状态,确保策略按预期执行。使用ILM,用户可以高效地管理数据,确保旧数据及时删除,同时保持数据完整性和安全性。
720 3
|
JavaScript
JS中every的简单使用
JS中every的简单使用
|
Java Shell 应用服务中间件
超详细总结docker镜像
超详细总结docker镜像
418 0
|
Web App开发 JavaScript 前端开发
JavaScript中的性能优化:代码优化技巧与性能分析工具
【4月更文挑战第22天】本文探讨JavaScript性能优化,包括代码优化技巧和性能分析工具。建议避免全局查找、减少DOM操作、使用事件委托、优化循环和异步编程以提升代码效率。推荐使用Chrome DevTools、Lighthouse和jsPerf等工具进行性能检测和优化。持续学习和实践是提升JavaScript应用性能的关键。
|
11月前
|
存储 Linux
logstash与Rsyslog安装配置
通过将Logstash和Rsyslog结合使用,可以实现强大的日志收集和处理功能。Rsyslog负责接收和转发系统日志,Logstash负责解析和存储日志数据。以上指南提供了详细的安装和配置步骤,确保了两者能够无缝协作,以满足各种日志管理需求。希望本文能帮助你在实际项目中高效地部署和使用Logstash与Rsyslog。
326 8
|
10月前
|
机器学习/深度学习 人工智能 机器人
智能体零样本解决未见过人类设计环境!全靠这个开放式物理RL环境空间
在人工智能领域,训练通用智能体以应对未知环境是巨大挑战。近期研究通过Kinetix——一个开放式物理强化学习(RL)环境空间,取得了突破。Kinetix由Michael Matthews等人提出,生成数千万个2D物理任务,训练出能零样本解决未见过环境的智能体。借助新型硬件加速物理引擎Jax2D,研究团队高效模拟数十亿环境步骤,使智能体在多样化环境中学习一般性机械属性,展现出出色的零样本和微调能力。论文地址:https://arxiv.org/pdf/2410.23208
204 3
C语言实现2048小游戏---粤嵌GE6818嵌入式系统实训
C语言实现2048小游戏---粤嵌GE6818嵌入式系统实训
|
机器学习/深度学习 编解码 算法
在线打开CAD或Solidworks的STP文件,通过以图搜图与实物比对搜索
智能比对系统利用大模型技术,实现设计图纸与实物的高效、精准比对。系统支持在线3D模型解析、多视图图片自动生成、实物照片智能比对及实时偏差标注,全面提升机械制造行业的设计、生产和质量控制效率。
722 2
|
人工智能 物联网 数据安全/隐私保护
操作系统的演变与未来:从单一到多元的演进之路
本文旨在探索操作系统的演化历程及其对未来技术发展的影响。通过分析不同时代的操作系统特点,我们能够理解现代操作系统设计的复杂性和多样性。文章将重点讨论操作系统如何适应新的硬件架构、满足日益增长的性能需求,并应对安全性和隐私保护的挑战。最后,我们将展望操作系统的未来发展趋势,包括人工智能和物联网等新兴技术的融合。
560 0

热门文章

最新文章