【SLS开源兼容系列】使用ES SDK 访问SLS

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
应用实时监控服务-应用监控,每月50GB免费额度
可观测监控 Prometheus 版,每月50GB免费额度
简介: 本文介绍如何用es sdk访问sls

场景

小丁最近把公司的elk搬迁到了sls上,使用sls的es兼容功能,配置好了Kibana和Grafana,大家用得都很开心。

不过有一个问题比较困扰,原来有一个应用使用了es的接口进行数据查询。现在因为切换sls,面临改造的问题。

咨询了阿里云sls工程师后,发现原来可以用es sdk访问sls 兼容接口,这真是极大地方便了迁移呀。

以上场景,或许会在es迁移的过程中出现。如果有系统依赖es接口,想继续使用,可以连sls es兼容接口,少量配置修改即可对接起来。

SLS和ES概念对齐

ES概念

Host

格式为https://${project}.${slsEndpoint}/es/

Username

AccessKeyId

Password

AccessKeySecret

Index

${project}.${logstore}

上面的${project}、${logstore} 和${slsEndpoint}请根据实际情况替换

示例

下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何用es 的sdk访问

使用curl访问sls的es兼容接口

curl -u ${AccessKeyId}:${AccessKeySecret} \
 "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"

这里的AccessKeyId 和AccessKeySecret 需要替换为真实值

q=可以填写具体请求的query,支持Lucene Query的格式

使用Python es sdk访问sls的es兼容接口

安装依赖

pip install elasticsearch==7.10.0

样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
r = esClient.search(
    index=esIndex,
    body=   {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": startTime,
                                "lte": endTime,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        }
     }
)
print(json.dumps(r, indent=4))

当然也可以使用python高阶封装的接口, 避免手工组装dsl,安装

pip install elasticsearch-dsl==7.4.1

使用 elasticsearch-dsl方式访问样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search, Q
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
s = Search(using=esClient, index=esIndex) \
        .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}}))  \
        .query("match", request_method="GET") \
response = s.execute()
for hit in response:
    # request_method, host, client_ip 是sls日志中的字段
    print(hit.request_method, hit.host, hit.client_ip)

使用Golang es sdk 访问sls的es兼容接口

样例

package main
import (
  "context"
  "fmt"
  "os"
  "time"
  "github.com/olivere/elastic/v7"
)
func main() {
  // 下面是一个es sdk访问sls es 兼容接口的样例
  slsProject := "etl-dev"
  slsLogstore := "accesslog"
  slsEndpoint := "cn-huhehaote.log.aliyuncs.com"
  accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
  accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
  esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
  esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)
  esClient, err := elastic.NewClient(
    elastic.SetURL(esHost),
    elastic.SetSniff(false),
    elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
    elastic.SetHealthcheck(false),                      // 关闭健康检查
  )
  if err != nil {
    panic(err)
  }
  termQuery := elastic.NewTermQuery("request_method", "GET")
  endTime := time.Now().Unix()
  startTime := endTime - 3600
  timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)
  boolQuery := elastic.NewBoolQuery()
  boolQuery = boolQuery.Must(timeRangeQuery, termQuery)
  searchResult, err := esClient.Search().
    Index(esIndex).
    Query(boolQuery).
    From(0).Size(10).
    Pretty(true).
    Do(context.Background())
  if err != nil {
    panic(err)
  }
  // 输出结果
  for _, hit := range searchResult.Hits.Hits {
    fmt.Println(string(hit.Source))
  }
}

使用Java es sdk访问sls的es兼容接口

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>estest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.10.2</version>
        </dependency>
    </dependencies>
</project>

样例

package org.example;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class Main {
    public static void main(String[] args) throws IOException {
        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";
        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));
        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        // Query
        BoolQueryBuilder boolExpr= new BoolQueryBuilder();
        long endTime = System.currentTimeMillis();
        long startTime = endTime - 3600 * 1000;
        boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
        boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolExpr);
        SearchRequest searchRequest = new SearchRequest(esIndex);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(searchResponse.toString());
        client.close();
    }
}

使用PHP es sdk访问sls的es兼容接口

使用 composer 安装  https://getcomposer.org/download/

composer require elasticsearch/elasticsearch

样例

<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;
$slsProject = 'etl-dev';
$slsLogstore = 'accesslog';
$slsEndpoint = 'cn-huhehaote.log.aliyuncs.com';
$esHost = $slsProject . '.' . $slsEndpoint;
$esIndex = $slsProject . '.' . $slsLogstore;
$accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID');
$accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET');
$hosts = [
    [
        'host' => $esHost,
        'port' => '443',
        'scheme' => 'https',
        'path' => '/es',
        'user' => $accessKeyId,
        'pass' => $accessKeySecret,
    ]
];
$client = ClientBuilder::create()
    ->setHosts($hosts)
    ->build();
$endTime = round(microtime(true) * 1000); // 毫秒
$startTime = $endTime - (3600 * 1000);
$params = [
    'index' => $esIndex,
    'body'  => [
        'query' => [
            'bool' => [
                'must' => [
                    'match' => [
                        'request_method' => 'GET'
                    ]
                ],
                'filter' => [
                    'range' => [
                        '@timestamp' => [
                            'gte' => $startTime,
                            'lte' => $endTime
                        ]
                    ]
                ]
            ]
        ]
    ]
];
$response = $client->search($params);
print_r($response);

小结

通过使用es标准的sdk,设置好正确的esHost、esIndex,可以很方便地访问sls的数据。期待您的使用~

参考

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4月前
|
Java Apache 开发工具
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
|
2月前
FFmpeg【SDK01】日志和字典的使用
FFmpeg中日志功能的使用方法,包括日志级别的设置和AVDictionary的基本操作,同时展示了字符串解析函数如av_parse_video_size、av_parse_video_rate和av_parse_time的应用。
39 2
|
3月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
49 2
|
4月前
|
Ubuntu Linux 测试技术
在Linux中,已知 apache 服务的访问日志按天记录在服务器本地目录/app/logs 下,由于磁盘空间紧张现在要求只能保留最近7天的访问日志,请问如何解决?
在Linux中,已知 apache 服务的访问日志按天记录在服务器本地目录/app/logs 下,由于磁盘空间紧张现在要求只能保留最近7天的访问日志,请问如何解决?
|
4月前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
|
5月前
|
存储 开发框架 前端开发
循序渐进VUE+Element 前端应用开发(31)--- 系统的日志管理,包括登录日志、接口访问日志、实体变化历史日志
循序渐进VUE+Element 前端应用开发(31)--- 系统的日志管理,包括登录日志、接口访问日志、实体变化历史日志
|
5月前
|
存储 并行计算 开发工具
SLS Prometheus存储问题之相比客户端SDK聚合写入,SLS网关侧聚合写入有什么优势
SLS Prometheus存储问题之相比客户端SDK聚合写入,SLS网关侧聚合写入有什么优势
|
4月前
|
开发工具 iOS开发 容器
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
|
4月前
|
网络安全
【Azure Service Bus】启用诊断日志来获取客户端访问Azure Service Bus的IP地址 [2024-03-26 实验结果失败]
【Azure Service Bus】启用诊断日志来获取客户端访问Azure Service Bus的IP地址 [2024-03-26 实验结果失败]
|
4月前
|
机器学习/深度学习 开发工具 Python
【Azure 应用服务】使用Python Azure SDK 来获取 App Service的访问限制信息(Access Restrictions)
【Azure 应用服务】使用Python Azure SDK 来获取 App Service的访问限制信息(Access Restrictions)