作者:张铭辉(希铭)
01 前言:WebSocket 的技术演进与时代价值
1.1 什么是 WebSocket?
WebSocket 是一种基于 TCP 协议的全双工通信协议(RFC 6455[1]),通过一次 HTTP 握手即可建立持久化连接,实现客户端与服务端的双向数据传输。以下是一次 WebSocket 通信的示意图[2]:
可以看到,和 HTTP 不同,Client 会先向 Server 端基于 HTTP 协议发起一次握手请求,Server 返回响应握手成功。在这之后,已有的 TCP 连接会被升级为 WebSocket 连接,Client 和 Server 之间可以进行全双工通信。TCP 连接会一直持续到其中一侧认为需要关闭,且对方同意关闭之时。
为了更好理解后续 WebSocket 的全链路可观测方案,有必要对 WebSocket 的协议细节进行解读,本节剩余内容部分翻译 + 总结自 WebSocket Protocol[3]。
1.1.1 URI 格式与语法
和 HTTP 协议族非常类似,WebSocket 也有普通协议和他的安全版本,用 ws 和 wss 来区分,wss 的安全也采用 TLS 协议实现。由于 WebSocket 依赖 HTTP 协议进行握手,后续复用原 TCP 连接,故 WebSocket 默认的端口也是 80(ws)和 443(wss)。URI 整体的格式也和 HTTP 非常类似。
1.1.2 启动连接握手(基于 HTTP/1.1)
传统的 WebSocket 握手是一次典型的 HTTP 请求/响应。客户端主动发起一个 WebSocket 握手请求(一个特殊的 GET),如果服务器支持且允许使用 WebSocket 协议通信,则会返回一个 WebSocket 握手响应。WebSocket Connection 就建立起来了。
握手请求包含以下头:
如果服务端接受 WebSocket 协议,则发送一个 StatusCode 为 101 的响应:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
响应包括:
- HTTP/1.1 101 Switching Protocols:表示成功从 HTTP 升级到 WebSocket。
- Upgrade: websocket:确认协议升级。
- Connection: Upgrade:表示连接已升级。
- Sec-WebSocket-Accept:一个根据客户端的 Sec-WebSocket-Key 计算出的值,用于验证服务器理解了 WebSocket 握手请求。
HTTP/2 与 HTTP/3 升级到 WebSocket 的过程有一些不同,但不是本文讨论的关键,在此不再赘述,欢迎阅读 WebSocket Protocol 原文[3]。
1.1.3 WebSocket 消息与数据帧
在握手完毕后,连接会被升级为 WebSocket 连接,此时客户端和服务端可以随时双向发送 WebSocket 消息(message),用来交换数据和指令。WebSocket 中的最小通信单元是数据帧,每个消息有可能由一个或者多个数据帧组成。
数据帧根据其用途可以分为以下三种类型:
- 文本帧:载荷为 UTF-8 编码的文本数据
- 二进制帧:载荷为二进制数据
- 控制帧:用于传递协议信号,如 ping、pong、close 帧等
一个数据帧的数据组成如下图所示:
关于数据帧中每段数据的含义,如有兴趣,欢迎阅读 WebSocket Protocol 原文 [3]。
1.1.4 关闭连接握手
当客户端或服务端某一方认为连接可以关闭时,会向对端发送一个关闭帧(是控制帧的一种),对端收到关闭帧后会尽快发送另一个关闭帧作为响应。发送完关闭帧后,该端不应该再发送任何数据帧。双方交换完关闭帧后,TCP 连接将关闭。
1.2 为什么用 WebSocket?
不难看出,WebSocket 核心特性体现在:
- 长连接保持:连接建立后持续存在,避免重复握手开销
- 双向数据通道:客户端与服务端可随时发送数据帧(Text/Binary)
- 低延迟特性:省去 HTTP 轮询的请求头传输成本
- 消息分帧机制:支持超大数据量的分片传输(单帧最大 2^64 字节)
与传统 HTTP 协议对比,WebSocket 在通信模式上实现了根本性突破:
这种协议特性使其成为大数据量下实时通信场景的首选方案。
1.3 AI 时代 WebSocket 协议的复兴
随着大模型技术的爆发,越来越多需要实时交互的场景开始出现,智能化赋予了 WebSocket 协议新的活力:
- 支持实时对话与交互的智能客服或机器人
- 车载 AI 助手与云端模型实时交互
- 自动翻译、智能识图的 AI 智能眼镜
除实时性外,WebSocket 为有状态的连接,多轮对话的记忆保持、即时打断输出等功能也比传统的 HTTP 更加容易实现。到目前为止,主流的大模型提供商大多都提供了 WebSocket 的交互 API 及配套的 SDK,帮助用户更好地构建后端服务系统,例如:
- OpenAI 支持基于 WebSocket 的 Realtime API[4]
- 百炼大模型服务平台发布基于 WebSocket 的实时多模态交互协议[5]
- Google Gemini 支持基于 WebSocket 的 Live API[6]
WebSocket 在赋能 AI 应用实时性的同时,也为应用系统的可观测性带来了很大的挑战。WebSocket 协议高度的灵活性与扩展性注定了它不能像 HTTP 和 gRPC 那样非常方便地做到全链路可观测,本文接下来将具体分析 WebSocket 场景下全链路可观测的实现痛点与解决方案。
02 WebSocket 全链路可观测痛点分析
2.1 协议灵活性带来的链路追踪困境
2.1.1 链路信息注入难
对于常规的 HTTP 调用,为了保证链路的连通性,调用方会在 HTTP headers 中额外添加一组用于承载链路上下文的键值对,确保被调用方在解析协议时能够正确地还原调用方的链路上下文,进而保证上下文可以被继续传递下去。图示是使用 W3C 链路追踪协议[7]时,链路上下文的 header 的一个具体示例:
而在 1.1.3 节我们了解到,一个 WebSocket 数据帧其实仅由数字节的控制位和数据载荷构成。除建立连接时握手以外,没有其他的机会传输 header 这些元数据。因此,传统 OpenTelemetry 的 W3C 链路上下文无法直接植入每个数据帧中。而在实际应用场景中,对于一次 WebSocket 连接,往往并不代表仅一次 WebSocket 调用,仅依赖建立连接时的 HTTP 请求与响应是远远不够的。同时,这也牵扯出第二个困难——Span 作用域界定模糊。
2.1.2 Span 作用域界定模糊
在可观测领域,我们一般把调用链路上一次关键的操作称为一条 Span(跨度)[8],一条调用链一般由一组树状结构的 Span 组成。在可观测前端的帮助下,我们可以把同属于一条调用链的 Span 召回,并根据父子关系(也就是调用关系)以及发生时间渲染为下图所示的瀑布图,以此来帮助我们了解一条链路发生的所有关键操作以及调用关系。
然而,在 WebSocket 场景下,操作粒度的定义可以非常灵活。如图所示,一个 Span 有可能对应一次 WebSocket 连接从开始到结束的全过程,也有可能对应每一次消息的收发,甚至也可以对应每一次数据帧的传递过程。对 Span 粒度定义的高度灵活也导致了链路上下文在注入与管理上也会有非常大的变化,这也增大了业务上落地的难度。
2.1.3 链路上下文的反向扩散问题
虽然我们根据 WebSocket 连接的发起方与接收方将两端分为了 Client 和 Server,但实际业务的处理过程是高度灵活的双向流,可能存在由 Server 侧发起请求,Client 进行处理的情况。例如,允许 Client 主动与 Server 建立连接并将自身服务注册给 Server 端,由 Server 发送消息来对 Client 进行回调。对于这种交互方式而言,消息生产方(调用方)是 Server,消费方(被调用方)是 Client,因此链路上下文应该由 Server 注入到消息中,由 Client 还原并进一步传递。
2.2 异步调用引发的断链危机
在 WebSocket 应用中,为了提高连接利用率,两端也常用异步的方式来解耦消息接收过程与处理过程,以下是一个典型的异步消息处理架构。在这个过程中,消息有可能会直接被提交到线程池,也有可能存放在一个进程内的队列,甚至直接写入 Redis 等外部存储。这种灵活多变的异步方式也给链路上下文的进程内透传带来了困难,非常容易出现断链问题。
03 基于 LoongSuite 的全链路观测最佳实践
3.1 方案基本原理
通过上两节的讨论,我们可以得到两个基本结论:
- WebSocket 的用法相当灵活,链路追踪的实现很大程度上取决于业务实现,需要开发者自主实现一些扩展来保证链路完整性
- 高频业务场景缺少一些最佳落地范式,导致自主实现链路追踪困难
此外,由于 WebSocket 链路上也难免存在一些 NoSQL、HTTP 等其他类型的调用,依然需要无侵入探针来保证各种调用的串联,这就要求无侵入探针与自定义扩展产生的链路上下文可以很好地互通。LoongSuite 无侵入探针提供的基于 OpenTelemetry API 的扩展机制就是解决这些问题的最佳手段[9]。
3.1.1 OpenTelemetry API 与 LoongSuite 探针工作原理
OpenTelemetry API 是 OpenTelemetry 社区定义的可观测数据采集标准的重要组件之一[10],它定义了一整套可观测领域使用的 API 行为标准和功能说明,比如可观测数据创建、上下文管理/透传、数据上报等逻辑,并为许多语言提供了配套的 SDK 实现。使用者可以基于 API 与 SDK 比较容易地实现上下文的管理与透传。以下是使用 Tracer API 定义 Span 的示意:
private int doWork() { // 创建 span Span doWorkSpan = tracer.spanBuilder("doWork").startSpan(); // 激活 span 所在上下文 try (Scope scope = doWorkSpan.makeCurrent()) { int result = 0; for (int i = 0; i < 10; i++) { result += i; } return result; } finally { // 结束 span doWorkSpan.end(); } }
LoongSuite 探针是阿里云可观测团队基于 OpenTelemetry 探针构建的,面向 AI 应用的开源的进程内可观测采集组件。对于热门的开源组件,例如 LangChain、OpenAI SDK、Tomcat 等,LoongSuite 探针提供了丰富的预定义插桩实现。使用者不再需要基于 OpenTelemetry API 进行开发,只需要修改编译或运行时命令,探针就能把可观测数据创建、上下文管理/透传、数据上报等关键逻辑自动完成,从而达成无侵入可观测的目标。
LoongSuite 探针可以满足生产应用绝大多数场景下的可观测需求,但对于一些高度自定义的场景,如消息系统中的复杂消费过程、部分 MQTT 场景以及 WebSocket 通信场景,使用 OpenTelemetry API/SDK 添加自定义埋点则是弥补无侵入探针监控盲区的最优方案。
3.1.2 LoongSuite 探针与自定义扩展交互示意
对于 Java、Golang 这类包管理相对严格(需要明确指定版本)的语言来说,探针与应用可能会存在版本不一致的依赖,比如 Jackson、gRPC 和 OpenTelemetry API/SDK 等等。为了避免依赖冲突,常采用 shadow 的方式进行依赖隔离。但这也会导致用户在使用 OpenTelemetry API 和 SDK 自主埋点的时候,产生的链路上下文并不能与探针内互通,进而导致调用链断裂。
OpenTelemetry 和 LoongSuite 探针同样采用代码增强机制保证了链路上下文的共享,具体整体示意如下:
- 探针和应用共用一套 API,API 自身保证向前兼容
- 探针初始化时,会将初始化好的实例对象注册到 GlobalHolder,应用中自定义埋点时,直接从 API 中的 GlobalHolder 就可以获取到探针的实例对象
- 对于 SDK 中定义的一些方法和静态的 API,如 Context、Baggage 等,通过代码增强的方式,跳过这些函数原本的调用,转而使用探针中对应的实现
通过以上机制,LoongSuite 探针可以很好地和 OpenTelemetry API/SDK 创建的 Span 串联在一起,保证了链路的完整性。
3.2 WebSocket 分布式链路追踪最佳实践
了解了这几个组件,关键的问题是,我的应用应该怎么添加这些自定义的埋点呢?在 WebSocket 全链路的实现中,需要先根据业务诉求明确几个问题:
会话粒度问题:一次 WebSocket 连接对应一条 Trace 还是多条 Trace?
- 对应一条 Trace:一次 WebSocket 连接是为了完成一系列相关性强的操作,且持续时间一般仅在数分钟;
- 对应多条 Trace:一次 WebSocket 连接会在建立完成后留存下来持续复用,持续时间可能持续几小时。
调用建模问题:WebSocket 内部的数据传输过程能否建模为离散的请求与响应?
- 如果连接建立后只用于双方传递数据,则不需要为每条消息专门创建 Span,一个 Span 的生命周期应该对应双方传递消息的完整过程;
- 如果连接建立后,一方发送消息,另一方处理消息并返回响应,则每组这类调用都可以创建一对父子 Span,对应的数据结构需要允许承载序列化后的链路上下文。
应对以上几个不同场景,自定义埋点的实现推荐也会有所差异,接下来将分别展开介绍。
3.2.1 引入 OpenTelemetry API 依赖
探针对 API 的兼容为向前兼容,对于最新版本的 API 适配可能比较有限,生产环境中 API 包的版本不需要过新,基本 API 足够使用即可。
对于 Java 语言,建议在 pom.xml 中引入。API 文档:https://javadoc.io/doc/io.opentelemetry/opentelemetry-api/1.28.0/index.html
<dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-api</artifactId> </dependency>
获取探针注入的全局实例:
openTelemetry = GlobalOpenTelemetry.get(); tracer = openTelemetry.getTracer("websocket-example", "1.0.0");
对于 Golang 语言,可以执行 go get 命令获取包。API 文档:https://pkg.go.dev/go.opentelemetry.io/otel@v1.28.0
go get go.opentelemetry.io/otel
获取探针注入的全局实例:
tracer := otel.GetTracerProvider().Tracer("websocket-example")
对于 Python 语言,可以通过 pip install 获取。API 文档:https://opentelemetry-python.readthedocs.io/en/latest/
pip install opentelemetry-api
获取探针注入的全局实例:
from opentelemetry import trace tracer = trace.get_tracer(__name__)
3.2.2 会话粒度问题——创建 WebSocket 连接维度的 Trace
实现建议:WebSocket 在建立连接时会基于 HTTP 请求发起握手,复用该 Trace 上下文作为整次 WebSocket 连接中子操作的上下文。
以下是以一整个 WebSocket 连接为一条 Trace 的实现基本示意图,所有的请求与数据传递都作为子 Span 挂靠在一条 Trace 下面。因此,这种实现更适合 WebSocket 连接按需连接并会及时关闭的场景。
Client 侧代码实现(以 Java 原生提供的 WebSocket 库为例):
public static void main(String[] args) throws Exception { // 1. 创建连接级别的 Trace(在连接前创建,以便在握手时传递 TraceContext) Span connectionSpan = tracer.spanBuilder("websocket.connection") .setAttribute("websocket.endpoint", "/native/ws") .setAttribute("websocket.destination", "ws://localhost:18081") .setAttribute("websocket.connection.type", "client") .startSpan(); // 2. 将当前 Span 激活在线程内的上下文中,标记 Span 的作用域为从连接开始到连接关闭 try (Scope scope = connectionSpan.makeCurrent()) { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 创建 WebSocket Client NativeWebSocketClient client = new NativeWebSocketClient(); // 使用 Endpoint 方式连接 Session session = container.connectToServer( new jakarta.websocket.Endpoint() { @Override public void onOpen(Session session, EndpointConfig config) { client.onOpen(session); // 注册消息处理器(使用匿名内部类而不是 lambda,避免泛型类型推断问题) session.addMessageHandler(new MessageHandler.Whole<String>() { @Override public void onMessage(String message) { client.onMessage(message); } }); } @Override public void onClose(Session session, CloseReason closeReason) { client.onClose(); } @Override public void onError(Session session, Throwable thr) { // 记录错误到当前 Span connectionSpan.recordException(thr); client.onError(thr); } }, // 3. 发起握手时,在请求头中携带当前的上下文 createHeaderWithUserProperties(), URI.create("ws://localhost:18081/native/ws")); client.session = session; client.sessionId = session.getId(); log.info("客户端已启动,输入消息发送给服务器(输入 'exit' 退出):"); // 从控制台读取输入 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; while ((line = reader.readLine()) != null && !line.equals("exit")) { if (!line.trim().isEmpty()) { // 4. 向 Server 发送消息 client.sendMessage(line); } } // 关闭连接 client.close(); log.info("客户端已退出"); } catch (Exception e) { // 如果出现错误,记录到 span 中 connectionSpan.recordException(e); log.error("客户端启动失败", e); } finally { // 5. 结束 span connectionSpan.end(); } // 等待 span 异步上报,实际业务中无需保留 Thread.sleep(5000L); } private static ClientEndpointConfig createHeaderWithUserProperties() { // 创建 ClientEndpointConfig,用于自定义握手请求头 ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create(); // 3.1. 获取当前的 TraceContext,准备 HTTP headers final Map<String, List<String>> headersMap = new HashMap<>(); Context currentContext = Context.current(); // 3.2. 通过全局实例的 ContextPropagators 注入 TraceContext 到 headers openTelemetry.getPropagators().getTextMapPropagator() .inject(currentContext, headersMap, (carrier, key, value) -> carrier.put(key, List.of(value))); // 3.3. 设置 Configurator 来在握手时添加 headers configBuilder.configurator(new ClientEndpointConfig.Configurator() { @Override public void beforeRequest(Map<String, List<String>> headers) { headers.putAll(headersMap); } }); return configBuilder.build(); }
Server 侧代码实现(以 Java 原生提供的 WebSocket 库为例):
@ServerEndpoint(value = "/native/ws", configurator = NativeWebSocketServer.TraceContextConfigurator.class) public class NativeWebSocketServer { // 按照 session 维度管理来自 Client 的上下文 private static final Map<String, Context> connectionTraceContexts = new ConcurrentHashMap<>(); // 1. 定义配置类,用于在握手时提取 TraceContext public static class TraceContextConfigurator extends ServerEndpointConfig.Configurator { private static final TextMapGetter<Map<String, List<String>>> headerGetter = new TextMapGetter<Map<String, List<String>>>() { @Override public Iterable<String> keys(Map<String, List<String>> carrier) { return carrier.keySet(); } @Override public String get(Map<String, List<String>> carrier, String key) { List<String> values = carrier.get(key); return values != null && !values.isEmpty() ? values.get(0) : null; } }; @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { // 从 HTTP headers 中提取 TraceContext Map<String, List<String>> headers = request.getHeaders(); Context extractedContext = openTelemetry.getPropagators() .getTextMapPropagator() .extract(Context.current(), headers, headerGetter); // 2. 将 TraceContext 存储到 userProperties,在 onOpen 时提取 sec.getUserProperties().put("traceContext", extractedContext); } } @OnOpen public void onOpen(Session session, EndpointConfig config) { String sessionId = session.getId(); sessions.put(sessionId, session); // 3. 从 config 的 userProperties 中提取 TraceContext(在 Configurator 中设置) Context parentContext = Context.current(); Object traceContextObj = config.getUserProperties().get("traceContext"); if (traceContextObj instanceof Context) { parentContext = (Context) traceContextObj; } // 4. 将 Client 链路上下文保存下来,在需要创建子 span 时获取即可 connectionTraceContexts.put(sessionId, parentContext); log.info("客户端连接: sessionId={}, 当前连接数={}, 已从 Client TraceContext 创建子 Span", sessionId, sessions.size()); sendMessage(session, "欢迎连接!您的会话ID: " + sessionId); } @OnClose public void onClose(Session session) { String sessionId = session.getId(); sessions.remove(sessionId); connectionTraceContexts.remove(sessionId); log.info("客户端断开: sessionId={}, 剩余连接数={}, Trace已结束", sessionId, sessions.size()); } }
3.2.3 会话粒度问题——使用会话 ID 关联不同的 Trace
实现建议:复用 WebSocket 的 Session ID 作为每条 Span 的属性,在必要时也可以按照属性查询来自于同一个 WebSocket 会话的所有 Trace。
以下是使用会话 ID 关联不同 Trace 的实现基本示意图,每次 Client 侧或 Server 侧发起的主动请求都是一条单独的 Trace,彼此之间并不会在 Trace 瀑布图中呈现关系,但可以通过会话 ID 这个属性进行过滤和查询。因此,这种实现更适合 WebSocket 连接时间很长,且可能存在复用的场景。
使用会话 ID 关联不同 Trace 实现方案相对简单,大多数框架都能直接获取到当前所在会话的 ID,调用 setAttribute API 写入 Span 即可,以下是一个基本示例:
public void sendMessage(Session session, String message) { Span span = tracer.spanBuilder("Client send message").startSpan(); // 向 span 中写入 session id span.setAttribute("websocket.session.id", session.getId()); try (Scope scope = span.makeCurrent()) { doSendMessage(message); } finally { span.end(); } }
3.2.4 调用建模问题——存在明显调用关系
实现建议:仿照 Messaging 系统的链路追踪逻辑,消息的发送者为调用方,消息的接受者为被调用方,分别创建 Span。调用方 Span 作为被调用方的父级。涉及多轮消息发送,只要意图为流式传输,视为一次调用行为。
链路效果如下图所示:
这种情形是生产应用中最普遍碰到的情况,要保证 Client 链路和 Server 链路的串联,需要调用方在发送消息时保证消息中有一个类似 headers 的预留字段用于传递链路上下文,该字段需要被 Client 和 Server 同时支持解析。许多生产服务都预留了这类字段,例如:语音合成CosyVoice WebSocket API#指令(客户端→服务端):https://help.aliyun.com/zh/model-studio/cosyvoice-websocket-api#b0100c3591yqq
调用方代码实现:
public void sendMessage(String message) { // 0. (可选)如果为一个 Connection 创建了 span,需要在此处执行 span.makeCurrent() // 1. 创建 header 字段 HashMap<String, String> headers = new HashMap<>(); // 2. 创建 span 并写入必要的属性 Span span = tracer.spanBuilder("Client send message").startSpan(); span.setAttribute("websocket.session.id", session.getId()); try (Scope scope = span.makeCurrent()) { // 3. 调用 OTel API,将上下文注入到 header 中 openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, (headersMap, key, value) -> headersMap.put(key, value)); // 4. 发送消息 // 如果是流式发送消息,则可以仅在第一条消息中添加 header,调用双方需要保证 span 创建的幂等性(即整个流式发送期间仅创建一个 span) sendMessage(message, headers); } finally { span.end(); } }
被调用方代码实现:
public void onMessage(String message, Session session) { String sessionId = session.getId(); try { // 1. 解析消息 MessageWithHeaders msgWithHeaders = objectMapper.readValue(message, MessageWithHeaders.class); Map<String, String> headers = msgWithHeaders.getHeaders(); // 2. 从消息中提取链路上下文 Context remoteContext = openTelemetry.getPropagators().getTextMapPropagator() .extract(Context.current(), headers, new TextMapGetter<Map<String, String>>() { @Override public Iterable<String> keys(Map<String, String> headersMap) { return headersMap.keySet(); } @Override public String get(Map<String, String> headersMap, String key) { return headersMap.getOrDefault(key, null); } }); // 3. 以提取出来的上下文作为父级,创建 Server span Span serverSpan = tracer.spanBuilder("Server handle message") .setParent(remoteContext).startSpan(); try (Scope scope = serverSpan.makeCurrent()) { // 4. 处理消息/流式返回响应 String body = msgWithHeaders.getBody(); log.info("收到消息 [{}] [headers={}]: {}", sessionId, headers, body); // 处理消息(带 headers) handleMessage(session, body, headers); } catch (Exception e) { serverSpan.recordException(e); } finally { serverSpan.end(); } } catch (Exception e) { log.error("消息接收失败 [{}]: {}", sessionId, message, e); } }
3.2.5 调用建模问题——无显式调用关系,仅传输数据
实现建议:数据发送方创建 Span,作为整个 WebSocket 连接 Span(如有)的子 Span,双方 Span 不维持父子关系。
链路效果如下图所示:
数据发送方代码示例:
public void streamingSendMessages(Session session) { // 0. (可选)如果为一个 Connection 创建了 span,需要在此处执行 span.makeCurrent() Context context = connectionTraceContexts.containsKey(session.getId()) ? connectionTraceContexts.get(session.getId()) : Context.current(); try (Scope pScope = context.makeCurrent()) { // 1. 创建 Span Span span = tracer.spanBuilder("Client send message").setParent(context).startSpan(); span.setAttribute("websocket.session.id", session.getId()); try (Scope scope = span.makeCurrent()) { // 2. 发送消息 while (messageQueue != null && messageQueue.containsKey(session.getId())) { List<Message> messages = messageQueue.get(session.getId()); messages.forEach(message -> sendMessage(session, message)); Thread.sleep(200L); } } finally { span.end(); } } }
3.2.6 异步透传问题——进程内异步上下文管理
一般地,在 WebSocket 应用中的异步存在两种实现:
- 基于线程池的异步调度,每当接收到消息,都创建一个 Runnable 或 Callable,或者创建一个 Golang/Python 协程
- 基于进程内队列进行异步通信(如 Java 的 Deque、Golang 的 Channel、Python 的 Generator 等),每当接收到消息都入队,由统一的 Worker 进行处理
对于第一种情形,LoongSuite 探针已经支持上下文的自动透传:
public void onMessage(String message) { Span messageSpan = tracer.spanBuilder("Server handle message").startSpan(); // 把当前 span 激活并放到 ThreadLocal 中 try (Scope scope = messageSpan.makeCurrent()) { // 异步调用消息处理流程 // 探针会在 Runnable 任务被创建时,将 span 所在上下文自动传递到 doHandleMessage 方法内部 // doHandleMessage 方法实际执行时,上下文会被自动复原 workerExecutor.execute(() -> doHandleMessage(message)); } }
对于第二种情形,需要使用者主动进行上下文透传和还原:
public void onMessage(String message) { Span messageSpan = tracer.spanBuilder("Server handle message").startSpan(); // 把当前 span 激活并放到 ThreadLocal 中 try (Scope scope = messageSpan.makeCurrent()) { // 手动将 TraceContext 与 Message 关联(也可以通过 Map) message.setTracingContext(Context.current()); // 消息入队 messageQueue.offer(message); } } public void pollAndHandleMessage() { while (true) { if (!messageQueue.isEmpty) { Message message = messageQueue.poll(); // 消息出队后,获取 TraceContext 与 Span Context tracingContext = message.getTracingContext(); Span span = Span.fromContext(tracingContext); // 将 TraceContext 重新激活并放到 ThreadLocal 中 try (Scope scope = tracingContext.makeCurrent()) { handleMessage(message); } finally { // 消息处理结束,关闭 span span.end(); } } Thread.sleep(100L); } }
3.3 WebSocket 中流式传输的关键业务指标
在 3.2 节中我们可以看到,在流式传输的场景下,我们会把一次完整的请求记录为一条 Span,以防止过多 Span 导致性能瓶颈。但这也会抹去流式传输中的一些关键性能信息——一次消息处理中,某些个别的数据包处理时长过长引发整个响应过程偏慢。实际生产中,这些指标也能很大程度上帮我们衡量应用的健康度与评估某些链路的问题所在,以下是几个常用的业务指标:
以下是计算这些指标的一个简单的工具类实现,关于详细的使用方式,欢迎查看示例代码:https://github.com/Cirilla-zmh/asr-demo/blob/main/asr-service/src/main/java/com/example/asr/ws/AsrWebSocketHandler.java
工具类定义
public class WebSocketPerformanceMeasure { private static final Logger log = LoggerFactory.getLogger(WebSocketPerformanceMeasure.class); private static final long UNINITIALIZED = -1L; private Long startTime; private Long firstChunkTime; private AtomicInteger chunkCounts; private AtomicLong totalInterval; private Long lastChunkTime; public static WebSocketPerformanceMeasure create() { WebSocketPerformanceMeasure measure = new WebSocketPerformanceMeasure(); measure.startTime = System.currentTimeMillis(); measure.firstChunkTime = UNINITIALIZED; measure.chunkCounts = new AtomicInteger(0); measure.totalInterval = new AtomicLong(0); measure.lastChunkTime = UNINITIALIZED; return measure; } /** * 开始测量(如果尚未开始) */ public void start() { if (startTime == null) { startTime = System.currentTimeMillis(); firstChunkTime = UNINITIALIZED; chunkCounts = new AtomicInteger(0); totalInterval = new AtomicLong(0); lastChunkTime = UNINITIALIZED; } } /** * 记录一个 chunk 的到达 * 自动计算 time_to_first_chunk 和更新间隔统计 * * @return 如果是第一个 chunk,返回 time_to_first_chunk(毫秒),否则返回 null */ public Long recordChunk() { if (startTime == null) { log.warn("Performance measure not started, calling start() automatically"); start(); } long currentTime = System.currentTimeMillis(); chunkCounts.incrementAndGet(); // 记录第一个 chunk 的时间 Long timeToFirstChunk = null; if (firstChunkTime == UNINITIALIZED) { timeToFirstChunk = currentTime - startTime; firstChunkTime = currentTime; log.debug("First chunk recorded, time_to_first_chunk: {}ms", timeToFirstChunk); } // 计算 chunk 间隔(从第二个 chunk 开始) if (lastChunkTime != UNINITIALIZED) { long interval = currentTime - lastChunkTime; totalInterval.addAndGet(interval); } lastChunkTime = currentTime; return timeToFirstChunk; } /** * 获取 time_to_first_chunk(毫秒) * 如果第一个 chunk 尚未到达,返回 null */ public Long getTimeToFirstChunk() { if (firstChunkTime == UNINITIALIZED || startTime == null) { return null; } return firstChunkTime - startTime; } /** * 获取 time_to_last_chunk(毫秒) * 需要保证在 chunk 完全到达后调用 * 如果第一个 chunk 尚未到达,返回 null */ public Long getTimeToLastChunk() { if (lastChunkTime == UNINITIALIZED || startTime == null) { return null; } return lastChunkTime - startTime; } /** * 获取平均 chunk 间隔(毫秒) * 如果 chunk 数量少于 2,返回 null */ public Long getAverageInterval() { int count = chunkCounts.get(); if (count < 2 || totalInterval == null) { return null; } return totalInterval.get() / (count - 1); } /** * 获取 chunk 总数 */ public int getChunkCount() { return chunkCounts != null ? chunkCounts.get() : 0; } }
04 典型场景实践:AI 语音对话系统
本节我们将结合一个生产中常见的业务系统,来简要介绍本文方案在该场景下的具体实践。相关 demo 代码已开源,欢迎移步 https://github.com/Cirilla-zmh/asr-demo 实践。
4.1 系统架构解析
以下是系统整体架构的简单示意:
设备端 → WebSocket → ASR → LLM(意图识别) ↓ ├─ 闲聊 → LLM(生成) → TTS → 设备端 └─ 下单 → MCP(下单) → LLM(生成) → TTS → 设备端
调用时序图:
4.2 接入 LoongSuite 探针
在本示例项目中,预留了探针挂载的环境变量,通过挂载 LoongSuite 探针,我们可以将 ASR demo 服务的可观测数据接入到 ARMS 控制台。以下是具体步骤:
1. 下载 LoongSuite 商业化探针并解压
为保证 LLM 链路的完整性,建议下载 4.6.0 及更高版本探针。
wget "http://arms-apm-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/4.6.0/AliyunJavaAgent.zip" -O AliyunJavaAgent.zip unzip AliyunJavaAgent.zip
2. 参照 README,在启动应用时添加探针挂载相关参数,相关参数可以参考手动安装探针[11]文档获取。
export JAVA_AGENT_OPTIONS="-javaagent:/path/to/4.6.0/AliyunJavaAgent/aliyun-java-agent.jar -Darms.licenseKey=${your_license_key} -Darms.appName=websocket-demo -Daliyun.javaagent.regionId=cn-hangzhou -Darms.workspace=${your_cms_workspace}" ./start.sh
你也可以接入 LoongSuite 开源版本探针或者 OpenTelemetry 探针,并可观测数据上报到开源的可观测平台,受限于篇幅,在此不再展开,欢迎移步 https://github.com/alibaba/loongsuite-java-agent 获取更多信息。
4.3 系统页面与可观测效果示意
以下是部署后的应用系统页面,类似现在的智能机器人 IM 系统,用于替代设备端的左右:
在发起对话后,统计上来的链路如图所示。可以在一条链路中清晰看到每个环节的持续时间:
在 WebSocket 对应 span 中,能够看到统计到的首包延迟与平均输出间隔等指标,帮助分析整体业务性能:
结语:未来展望
WebSocket 领域的全链路可观测性一直以来都是让许多企业开发和运维人员头痛的问题。可观测性的解决方案并不能一蹴而就,需要与用户进行持续深度共建与配合。很兴奋能看到公牛在与可观测团队共同完成了该方案在生产上的实际落地[12],也为我们方案的完善提供了非常宝贵的经验。未来我们将与更多的用户与开源开发者共建,持续补充和建设更完善、更易用的 WebSocket 可观测方案。
欢迎大家关注 LoongSuite 社区,以获取相关方案的最新进展:
参考文章:
[1] RFC 6455
https://datatracker.ietf.org/doc/html/rfc6455
[2] The Road to WebSockets
https://websocket.org/guides/road-to-websockets/
[3] WebSocket Protocol
https://websocket.org/guides/websocket-protocol/
[4] OpenAI Realtime API
https://platform.openai.com/docs/guides/realtime-websocket
[5] 实时多模态交互协议(WebSocket)
https://help.aliyun.com/zh/model-studio/multimodal-interaction-protocol
[6] Live API - WebSockets API reference
https://ai.google.dev/api/live
[7] Trace Context
https://www.w3.org/TR/trace-context/#abstract
[8] Distributed Tracing 基本介绍
https://observability.cn/project/opentelemetry/rp8k7gzvtys07zsb/
[9] 通过OpenTelemetry Java SDK为调用链增加自定义埋点
[10] OpenTelemetry Specification Overview
https://opentelemetry.io/docs/specs/otel/overview/
[11] 手动安装探针
[12] 《让每次语音唤醒都可靠,公牛沐光重构可观测体系》