4.2.3.CDN流媒体服务实时分析Elasticsearch实践
创作人:吴斌
审稿人:李捷
发挥 Elastic Stack 在日志和实时数据分析计算领域的一些优势,对流媒体服务这样规模较大、实时性要求偏高,且分析、业务探索流程要求灵活的业务是一个比较百搭的选择。
数据逻辑架构如下:
整体架构比较直观简单。我们省去了业务组建和存储层可能会用到的其他引擎,把目光主要集中在 Elasticsearch 上。
日志采集
日志的收集在正式进入数据管道前,可以选择落地或者直接吐到消息队列。这里采集的内容也主要分成 2 大部分:
l CDN/网络访问日志
l 业务打点数据
业务打点的数据可以根据需求采集,实时部分主要会聚焦在,例如卡顿等这样的用户体验指标。网络访问的日志通常比较通用,这里我们也先给出一个例子,相信大家看上去会比较熟悉。
那么根据这里样例的数据,Elasticsearch 可以轻松的利用内置的 processor 和聚合功能做快速的分析,后面我们会举例说明。
{ "receiveTimestamp": "2021-04-28T14:30:17.90993285Z", "spanId": "blahblah", "trace": "blah/f5c7578feaf277dd9a8d96", "@timestamp": "2021-04-28T14:30:17.549287Z", "logName": "logs/requests", "jsonPayload": { "@type": "loadbalancing.type.LoadBalancerLogEntry", "latencySeconds": "0.001749s", "statusDetails": "response_from_cache", "cacheIdCityCode": "ABC", "cacheId": "ABC-abcabc123" }, "httpRequest": { "remoteIp": "10.0.0.1", "remoteIpIsp": { "ip": "10.0.0.2", "organization_name": "China Telecom", "asn": 8346, "network": "10.0.0.0/15" }, "requestMethod": "GET", "responseSize": "125621", "userAgent": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840", "frontendSrtt": 0.124, "cacheLookup": true, "geo": { "continent_name": "Asia", "country_iso_code": "CN", "country_name": "China", "location": { "lon": 123, "lat": 321 } }, "backendLatency": 0.001749, "requestUrl": "http://bindiego.com/vid/bindigo.m4s", "requestDomain": "bindiego.com", "cacheHit": true, "requestSize": "671", "requestProtocol": "http", "user_agent": { "original": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840", "os": { "name": "Android", "version": "10", "full": "Android 10" }, "name": "Android", "device": { "name": "Generic Smartphone" }, "version": "10" }, "status": 200, "resourceType": "m4s" } }
这里就是最终导入到 Elasticsearch 里可分析的网络性能数据。针对这个数据,我们分别对它经过的管道和处理做一个简单快速的剖析。
消息队列
日志采集器会直接把数据打到消息队列,这里主要起到一个抗反压缓冲的作用。有些队列有很多附加的功能,例如存储和窗口计算,这里我们只使用最单纯的功能。因为后面我们选取了分布式计算引擎来做这这部分。
分布式计算引擎
分布式计算引擎,其实在整体实时数据分析业务里,扮演的着实是非常重要的角色。例如实时指标的窗口计算,迟报数据的修正等等。但在我们这个简单的场景下为了后面在 Elasticsearch内更方便快捷的分析、过滤数据。
我们这里主要做了 ETL 和补全。例如把请求资源的域和资源类型提取出来,还有 CDN 缓存节点的区域代码等等。但是例如 IP 地址地理位置、用户设备类型和运营商(ISP)的反查,方便起见,我们利用了 Elasticsearch Ingest 节点预置的 Pipeline 去做。
这里要注意的就是,如果你的集群配置是全角色的节点,会对数据节点的性能有影响。建议使用独立的 ingest node 去做,且如果是在 K8S 上部署的话,还可以弹性扩容这组 nodeSet。
下面是 Ingest 节点配置举例:
完整代码戳这里:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-pipeline.json
{ "description": "IP & user agent lookup", "processors": [ { "user_agent" : { "field" : "httpRequest.userAgent", "target_field" : "httpRequest.user_agent", "ignore_missing": true } }, { "geoip" : { "field" : "httpRequest.remoteIp", "target_field" : "httpRequest.geo", "ignore_missing": true } }, { "geoip" : { "field" : "httpRequest.remoteIp", "target_field" : "httpRequest.remoteIpIsp", "database_file" : "GeoLite2-ASN.mmdb", "ignore_missing": true } } ] }
数据安全
数据安全这块顺带提一下,现在 Elasticsearch 的认证、授权都可以在 Basic License 里使用了,非常方便。这里简单提一下通讯这块,很多小伙伴用的是自签的证书。这个问题不大,经常被问到在使用 RestClient 开发的时候如何绕过去(例如在写计算引擎最后入库的时候)。其实方法也很简单,这里就给大家上个代码片段说明看下。
配置:https://github.com/elasticsearch-cn/elastic-on-gke#option-2-regional-tcp-lb
try { SSLContext context = SSLContext.getInstance("TLS"); context.init(null, new TrustManager[] { new X509TrustManager() { public void checkClientTrusted(X509Certificate[] chain, String authType) {} public void checkServerTrusted(X509Certificate[] chain, String authType) {} public X509Certificate[] getAcceptedIssuers() { return null; } } }, null); httpAsyncClientBuilder.setSSLContext(context) .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); } catch (NoSuchAlgorithmException ex) { logger.error("Error when setup dummy SSLContext", ex); } catch (KeyManagementException ex) { logger.error("Error when setup dummy SSLContext", ex); } catch (Exception ex) { logger.error("Error when setup dummy SSLContext", ex); }
《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.3.CDN流媒体服务实时分析Elasticsearch实践(下) https://developer.aliyun.com/article/1225999