SpringCloud Gateway 网关的请求体body的读取和修改

本文涉及的产品
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: SpringCloud Gateway 框架中,为了处理请求体body,实现多次读取与修改,创建了一个名为`RequestParamGlobalFilter`的全局过滤器。这个过滤器使用`@Component`和`@Slf4j`注解,实现了`GlobalFilter`和`Ordered`接口,设置最高优先级以首先读取body。它通过缓存请求体并创建装饰过的`ServerHttpRequest`来实现body的动态获取。

SpringCloud Gateway 网关的请求体body的读取和修改

getway需要多次对body 进行操作,需要对body 进行缓存

缓存body 动态多次获取

新建顶层filter,对body 进行缓存

 

import lombok.extern.slf4j.Slf4j;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.core.Ordered;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferUtils;

import org.springframework.core.io.buffer.DefaultDataBuffer;

import org.springframework.core.io.buffer.DefaultDataBufferFactory;

import org.springframework.http.HttpHeaders;

import org.springframework.http.MediaType;

import org.springframework.http.codec.HttpMessageReader;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpRequestDecorator;

import org.springframework.stereotype.Component;

import org.springframework.web.reactive.function.server.HandlerStrategies;

import org.springframework.web.reactive.function.server.ServerRequest;

import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import java.util.List;

/**

* @author: zhoumo

* @descriptions:

*/

@Component

@Slf4j

public class RequestParamGlobalFilter implements GlobalFilter, Ordered {

   @Override

   public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

       /**

        * save request path and serviceId into gateway context

        */

       ServerHttpRequest request = exchange.getRequest();

       HttpHeaders headers = request.getHeaders();

       // 处理参数

       MediaType contentType = headers.getContentType();

       long contentLength = headers.getContentLength();

       if (contentLength > 0) {

                 return readBody(exchange, chain);

       }

       return chain.filter(exchange);

   }

   /**

    * default HttpMessageReader

    */

   private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();

   /**

    * ReadJsonBody

    *

    * @param exchange

    * @param chain

    * @return

    */

   private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) {

       /**

        * join the body

        */

       return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

           byte[] bytes = new byte[dataBuffer.readableByteCount()];

           dataBuffer.read(bytes);

           DataBufferUtils.release(dataBuffer);

           Flux<DataBuffer> cachedFlux = Flux.defer(() -> {

               DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);

               DataBufferUtils.retain(buffer);

               return Mono.just(buffer);

           });

           /**

            * repackage ServerHttpRequest

            */

           ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

               @Override

               public Flux<DataBuffer> getBody() {

                   return cachedFlux;

               }

           };

           /**

            * mutate exchage with new ServerHttpRequest

            */

           ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

           /**

            * read body string with default messageReaders

            */

           return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class)

                   .doOnNext(objectValue -> {

                       log.debug("[GatewayContext]Read JsonBody:{}", objectValue);

                   }).then(chain.filter(mutatedExchange));

       });

   }

   @Override

   public int getOrder() {

       return HIGHEST_PRECEDENCE;

   }

}

在子节点层获取body




AtomicReference<String> requestBody = new AtomicReference<>("");                RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(request);                Flux<DataBuffer> body = requestDecorator.getBody();                body.subscribe(buffer -> {                    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());                    requestBody.set(charBuffer.toString());                });                String body= requestBody.get();

重写获取body方法



  public class RecorderServerHttpRequestDecorator  extends ServerHttpRequestDecorator {        private final List<DataBuffer> dataBuffers = new ArrayList<>();        public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) {            super(delegate);            super.getBody().map(dataBuffer -> {                dataBuffers.add(dataBuffer);                return dataBuffer;            }).subscribe();        }        @Override        public Flux<DataBuffer> getBody() {            return copy();        }        private Flux<DataBuffer> copy() {            return Flux.fromIterable(dataBuffers)                    .map(buf -> buf.factory().wrap(buf.asByteBuffer()));        }    }


对body 进行修改重新封装



               String str=""+encodedDecryptedParam;                DataBuffer bodyDataBuffer = stringBuffer(str);                Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);                MediaType contentType = request.getHeaders().getContentType();                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(                        exchange.getRequest()) {                    @Override                    public HttpHeaders getHeaders() {                        HttpHeaders httpHeaders = new HttpHeaders();                        int length = str.getBytes().length;                        httpHeaders.putAll(super.getHeaders());                        httpHeaders.remove(HttpHeaders.CONTENT_TYPE);                        httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);                        httpHeaders.setContentLength(length);                        httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString());                        // 设置CONTENT_TYPE                        return httpHeaders;                    }                    @Override                    public Flux<DataBuffer> getBody() {                        return bodyFlux;                    }                };                return chain.filter(exchange.mutate().request(mutatedRequest).build());



一定必须加上 public HttpHeaders getHeaders()对header 重新封装,否则接口层会卡死,request 无限大

目录
相关文章
|
23天前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
Gateway服务网关
30 1
Gateway服务网关
|
2月前
|
XML Java 数据格式
如何使用 Spring Cloud 实现网关
如何使用 Spring Cloud 实现网关
40 3
|
3月前
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
3月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
125 5
|
2月前
|
缓存 网络协议 API
【Azure 环境】请求经过应用程序网关,当响应内容大时遇见504超时报错
应用程序网关的响应缓冲区可以收集后端服务器发送的全部或部分响应数据包,然后再将它们发送给客户端。 默认在应用程序网关上启用响应缓冲,这对于适应缓慢的客户端很有用。
|
4月前
|
Java API 微服务
服务网关Gateway
该博客文章详细介绍了Spring Cloud Gateway的使用方法和概念。文章首先阐述了API网关在微服务架构中的重要性,解释了客户端直接与微服务通信可能带来的问题。接着,文章通过具体的示例代码,展示了如何在Spring Cloud Gateway中添加依赖、编写路由规则,并对路由规则中的基本概念如Route、Predicate和Filter进行了详细解释。最后,文章还提供了路由规则的测试方法。
服务网关Gateway
|
4月前
|
负载均衡 Java API
深度解析SpringCloud微服务跨域联动:RestTemplate如何驾驭HTTP请求,打造无缝远程通信桥梁
【8月更文挑战第3天】踏入Spring Cloud的微服务世界,服务间的通信至关重要。RestTemplate作为Spring框架的同步客户端工具,以其简便性成为HTTP通信的首选。本文将介绍如何在Spring Cloud环境中运用RestTemplate实现跨服务调用,从配置到实战代码,再到注意事项如错误处理、服务发现与负载均衡策略,帮助你构建高效稳定的微服务系统。
96 2
|
5月前
|
Java 微服务 Spring
SpringCloud gateway自定义请求的 httpClient
SpringCloud gateway自定义请求的 httpClient
204 3
|
4月前
|
安全 API
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
|
4月前
|
存储 容器
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心