我们打开nacos的web 页面
[ { "id": "e-commerce-nacos-client", "predicates": [ { "args": { "pattern": "/imooc/ecommerce-nacos-client/**" }, "name": "Path" } ], "uri": "lb://e-commerce-nacos-client", "filters": [ { "name": "HeaderToken" }, { "name": "StripPrefix", "args": { "parts": "1" } } ] }, { "id": "e-commerce-account-service", "predicates": [ { "args": { "pattern": "/imooc/ecommerce-account-service/**" }, "name": "Path" } ], "uri": "lb://e-commerce-account-service", "filters": [ { "name": "StripPrefix", "args": { "parts": "1" } } ] }, { "id": "e-commerce-goods-service", "predicates": [ { "args": { "pattern": "/imooc/ecommerce-goods-service/**" }, "name": "Path" } ], "uri": "lb://e-commerce-goods-service", "filters": [ { "name": "StripPrefix", "args": { "parts": "1" } } ] } ]
动态路由网关的配置
GatewayConfig 创建 config 设置一些 需要的参数 ,比如
超时时间
nacos 服务器的地址
命名空间
data-id
Group id
/** * <h1>配置类, 读取 Nacos 相关的配置项, 用于配置监听器</h1> * */ @Configuration public class GatewayConfig { /** 读取配置的超时时间 */ public static final long DEFAULT_TIMEOUT = 30000; /** Nacos 服务器地址 */ public static String NACOS_SERVER_ADDR; /** 命名空间 */ public static String NACOS_NAMESPACE; /** data-id */ public static String NACOS_ROUTE_DATA_ID; /** 分组 id */ public static String NACOS_ROUTE_GROUP; @Value("${spring.cloud.nacos.discovery.server-addr}") public void setNacosServerAddr(String nacosServerAddr) { NACOS_SERVER_ADDR = nacosServerAddr; } @Value("${spring.cloud.nacos.discovery.namespace}") public void setNacosNamespace(String nacosNamespace) { NACOS_NAMESPACE = nacosNamespace; } @Value("${nacos.gateway.route.config.data-id}") public void setNacosRouteDataId(String nacosRouteDataId) { NACOS_ROUTE_DATA_ID = nacosRouteDataId; } @Value("${nacos.gateway.route.config.group}") public void setNacosRouteGroup(String nacosRouteGroup) { NACOS_ROUTE_GROUP = nacosRouteGroup; } }
这里我们再次梳理一下思路 这里我们保存的这些配置信息,这里我们做的是保存当前配置的,之后发生改变了,我们先监听,再去获取配置信息之后刷新配置。 接下来我们编写注册网关事件更新操作
编写注册网关事件更新
/** * 事件推送 Aware: 动态更新路由网关 Service * */ @Slf4j @Service @SuppressWarnings("all") public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware { /** 写路由定义 */ private final RouteDefinitionWriter routeDefinitionWriter; /** 获取路由定义 */ private final RouteDefinitionLocator routeDefinitionLocator; /** 事件发布 */ private ApplicationEventPublisher publisher; public DynamicRouteServiceImpl(RouteDefinitionWriter routeDefinitionWriter, RouteDefinitionLocator routeDefinitionLocator) { this.routeDefinitionWriter = routeDefinitionWriter; this.routeDefinitionLocator = routeDefinitionLocator; } @Override public void setApplicationEventPublisher( ApplicationEventPublisher applicationEventPublisher) { // 完成事件推送句柄的初始化 this. = applicationEventPublisher; } /** * <h2>增加路由定义</h2> * */ public String addRouteDefinition(RouteDefinition definition) { log.info("gateway add route: [{}]", definition); // 保存路由配置并发布 routeDefinitionWriter.save(Mono.just(definition)).subscribe(); // 发布事件通知给 Gateway, 同步新增的路由定义 this.publisher.publishEvent(new RefreshRoutesEvent(this)); return "success"; } /** * <h2>更新路由</h2> * */ public String updateList(List<RouteDefinition> definitions) { log.info("gateway update route: [{}]", definitions); // 先拿到当前 Gateway 中存储的路由定义 List<RouteDefinition> routeDefinitionsExits = routeDefinitionLocator.getRouteDefinitions().buffer().blockFirst(); if (!CollectionUtils.isEmpty(routeDefinitionsExits)) { // 清除掉之前所有的 "旧的" 路由定义 routeDefinitionsExits.forEach(rd -> { log.info("delete route definition: [{}]", rd); deleteById(rd.getId()); }); } // 把更新的路由定义同步到 gateway 中 definitions.forEach(definition -> updateByRouteDefinition(definition)); return "success"; } /** * <h2>根据路由 id 删除路由配置</h2> * */ private String deleteById(String id) { try { log.info("gateway delete route id: [{}]", id); this.routeDefinitionWriter.delete(Mono.just(id)).subscribe(); // 发布事件通知给 gateway 更新路由定义 this.publisher.publishEvent(new RefreshRoutesEvent(this)); return "delete success"; } catch (Exception ex) { log.error("gateway delete route fail: [{}]", ex.getMessage(), ex); return "delete fail"; } } /** * <h2>更新路由</h2> * 更新的实现策略比较简单: 删除 + 新增 = 更新 * */ private String updateByRouteDefinition(RouteDefinition definition) { try { log.info("gateway update route: [{}]", definition); this.routeDefinitionWriter.delete(Mono.just(definition.getId())); } catch (Exception ex) { return "update fail, not find route routeId: " + definition.getId(); } try { this.routeDefinitionWriter.save(Mono.just(definition)).subscribe(); this.publisher.publishEvent(new RefreshRoutesEvent(this)); return "success"; } catch (Exception ex) { return "update route fail"; } } }
这里我们进行了 事件推送器的操作,更新配置和删除,新增操作,可能没接触过的小伙伴会比较的蒙圈,这里可以去 补充学习一下 Spring5 reactor编程
编写完对应的操作,我们就需要去连接 nacos 之后通过 nacos 的api 获取配置和初始化进行一些操作了
编写 连接 nacos 获取配置
/** * <h1>通过 nacos 下发动态路由配置, 监听 Nacos 中路由配置变更</h1> * */ @Slf4j @Component //这个注解 是一个依赖注解,这里是只一个类加载之后,这个类再加载 @DependsOn({"gatewayConfig"}) public class DynamicRouteServiceImplByNacos { /** Nacos 配置服务 */ private ConfigService configService; private final DynamicRouteServiceImpl dynamicRouteService; public DynamicRouteServiceImplByNacos(DynamicRouteServiceImpl dynamicRouteService) { this.dynamicRouteService = dynamicRouteService; } /** * <h2>Bean 在容器中构造完成之后会执行 init 方法</h2> * */ @PostConstruct public void init() { log.info("gateway route init...."); try { // 初始化 Nacos 配置客户端 configService = initConfigService(); if (null == configService) { log.error("init config service fail"); return; } // 通过 Nacos Config 并指定路由配置路径去获取路由配置 String configInfo = configService.getConfig( GatewayConfig.NACOS_ROUTE_DATA_ID, GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.DEFAULT_TIMEOUT ); log.info("get current gateway config: [{}]", configInfo); List<RouteDefinition> definitionList = JSON.parseArray(configInfo, RouteDefinition.class); if (CollectionUtils.isNotEmpty(definitionList)) { for (RouteDefinition definition : definitionList) { log.info("init gateway config: [{}]", definition.toString()); dynamicRouteService.addRouteDefinition(definition); } } } catch (Exception ex) { log.error("gateway route init has some error: [{}]", ex.getMessage(), ex); } // 设置监听器 dynamicRouteByNacosListener(GatewayConfig.NACOS_ROUTE_DATA_ID, GatewayConfig.NACOS_ROUTE_GROUP); } /** * <h2>初始化 Nacos Config</h2> * */ private ConfigService initConfigService() { try { Properties properties = new Properties(); properties.setProperty("serverAddr", GatewayConfig.NACOS_SERVER_ADDR); properties.setProperty("namespace", GatewayConfig.NACOS_NAMESPACE); return configService = NacosFactory.createConfigService(properties); } catch (Exception ex) { log.error("init gateway nacos config error: [{}]", ex.getMessage(), ex); return null; } } /** * <h2>监听 Nacos 下发的动态路由配置</h2> * */ private void dynamicRouteByNacosListener(String dataId, String group) { try { // 给 Nacos Config 客户端增加一个监听器 configService.addListener(dataId, group, new Listener() { /** * <h2>自己提供线程池执行操作</h2> * */ @Override public Executor getExecutor() { return null; } /** * <h2>监听器收到配置更新</h2> * @param configInfo Nacos 中最新的配置定义 * */ @Override public void receiveConfigInfo(String configInfo) { log.info("start to update config: [{}]", configInfo); List<RouteDefinition> definitionList = JSON.parseArray(configInfo, RouteDefinition.class); log.info("update route: [{}]", definitionList.toString()); dynamicRouteService.updateList(definitionList); } }); } catch (NacosException ex) { log.error("dynamic update gateway config error: [{}]", ex.getMessage(), ex); } } }
验证动态配置的可用性
2021-12-08 14:15:58.335 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : gateway route init.... 2021-12-08 14:15:58.687 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : get current gateway config: [[ { "id": "e-commerce-nacos-client", "predicates": [ { "args": { "pattern": "/imooc/ecommerce-nacos-client/**" }, "name": "Path" } ], "uri": "lb://e-commerce-nacos-client" }]] 2021-12-08 14:15:58.776 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : init gateway config: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}] 2021-12-08 14:15:58.776 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.config.DynamicRouteServiceImpl : gateway add route: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]
可以看到 我们成功的连接到了 nacos 并且拿到了配置
这个时候我们 修改配置 id 变动 这个时候查看我们的控制台
2021-12-08 14:53:35.641 WARN [e-commerce-gateway,,,] 32948 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available. 2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos : start to update config: [[ { "id": "e-commerce-nacos-client1", "predicates": [ { "args": { "pattern": "/imooc/ecommerce-nacos-client/**" }, "name": "Path" } ], "uri": "lb://e-commerce-nacos-client" }]] 2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos : update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]] 2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.config.DynamicRouteServiceImpl : gateway update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]]
就可以看到我们配置更新 日志打出 证明 我们可以在项目启动的时候动态的修改路由配置,网关随着负责增加,需要频繁的变更,所以我们这里才会使用动态配置。
SpringCloud Gateway Filter
认识过滤器 , SpringCloud Gateway Filter
基于过滤器的思想实现,与 zuul 类似 。有 pre 和 post 两种方式都filter,分别处理前置逻辑和后置逻辑
前置 : 客户端请求会经过pre类型的filter 然后将请求转发到具体的业务服务,
**后置:**收到服务端响应后 经过 post 类型的filter 处理 最后返回给客户端
**Filter有两大类别:**全局过滤器和局部过滤器
这里我们查看一下Gateway给我们提供的 局部和全局过滤器的各别思路
全局的过滤器
这里我们可以看到,每一个全局过滤器都需要实现 全局过滤器接口和对应的 filter方法,下面我们来看一下其中一个实现类
RouteToRequestUrlFilter
这个类的核心方法,我们来解读一下这个方法的作用 (以对应代码部分的注释的方式解读)
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //传入路由对象,从前一个过滤器 Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); //判断是否为null 就继续通过 if (route == null) { return chain.filter(exchange); } //判断是否有 uri 获取到之后 创建一个新的uri log.trace("RouteToRequestUrlFilter start"); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); routeUri = URI.create(routeUri.getSchemeSpecificPart()); } // 如果uri 前面有 lb 就是告诉gateway 不能从uri拿到服务了,要去对应的注册中心获取,此时再用服务地址会抛出异常 if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) { // Load balanced URIs should always have a host. If the host is null it is // most // likely because the host name was invalid (for example included an // underscore) throw new IllegalStateException("Invalid host: " + routeUri.toString()); } //这部分就是 将微服务的地址,转换成 uri的服务地址,方便调用服务,新生成的uri 会继续往下传递 URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()).host(routeUri.getHost()) .port(routeUri.getPort()).build(encoded).toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
局部过滤器
局部过滤器主要方法 都是返回一个 GatewayFilter对象
PrefixPathGatewayFilterFactory 局部 前置过滤器
/** * @author Spencer Gibb */ public class PrefixPathGatewayFilterFactory extends AbstractGatewayFilterFactory<PrefixPathGatewayFilterFactory.Config> { /** * Prefix key. */ //代表他是一个 Pre类型的 public static final String PREFIX_KEY = "prefix"; private static final Log log = LogFactory .getLog(PrefixPathGatewayFilterFactory.class); public PrefixPathGatewayFilterFactory() { super(Config.class); } @Override public List<String> shortcutFieldOrder() { return Arrays.asList(PREFIX_KEY); } @Override public GatewayFilter apply(Config config) { return new GatewayFilter() { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //校验 是否添加了前缀 boolean alreadyPrefixed = exchange .getAttributeOrDefault(GATEWAY_ALREADY_PREFIXED_ATTR, false); if (alreadyPrefixed) { return chain.filter(exchange); } exchange.getAttributes().put(GATEWAY_ALREADY_PREFIXED_ATTR, true); ServerHttpRequest req = exchange.getRequest(); addOriginalRequestUrl(exchange, req.getURI()); String newPath = config.prefix + req.getURI().getRawPath(); //构造新的 路径 获取到请求 获取到 url 添加一个前缀 ,之后重新构造一个url request ServerHttpRequest request = req.mutate().path(newPath).build(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI()); if (log.isTraceEnabled()) { log.trace("Prefixed URI with: " + config.prefix + " -> " + request.getURI()); } return chain.filter(exchange.mutate().request(request).build()); } @Override public String toString() { return filterToStringCreator(PrefixPathGatewayFilterFactory.this) .append("prefix", config.getPrefix()).toString(); } }; } public static class Config { private String prefix; public String getPrefix() { return prefix; } public void setPrefix(String prefix) { this.prefix = prefix; } } }
StripPrefixGatewayFilterFactory 后置过滤器
这里 StripPrefix的意思就是 去掉前缀,
/** * This filter removes the first part of the path, known as the prefix, from the request * before sending it downstream. * * @author Ryan Baxter */ public class StripPrefixGatewayFilterFactory extends AbstractGatewayFilterFactory<StripPrefixGatewayFilterFactory.Config> { /** * Parts key. */ public static final String PARTS_KEY = "parts"; public StripPrefixGatewayFilterFactory() { super(Config.class); } @Override public List<String> shortcutFieldOrder() { return Arrays.asList(PARTS_KEY); } @Override public GatewayFilter apply(Config config) { return new GatewayFilter() { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); addOriginalRequestUrl(exchange, request.getURI()); //获取到原始的 uri String path = request.getURI().getRawPath(); String newPath = "/" + Arrays.stream(StringUtils.tokenizeToStringArray(path, "/")) //去掉相应的前缀 .skip(config.parts).collect(Collectors.joining("/")); //之后去构建一个 新的 path newPath += (newPath.length() > 1 && path.endsWith("/") ? "/" : ""); ServerHttpRequest newRequest = request.mutate().path(newPath).build(); //之后转发 exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI()); return chain.filter(exchange.mutate().request(newRequest).build()); } @Override public String toString() { return filterToStringCreator(StripPrefixGatewayFilterFactory.this) .append("parts", config.getParts()).toString(); } }; } public static class Config { private int parts; public int getParts() { return parts; } public void setParts(int parts) { this.parts = parts; } } }
过滤器的执行流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J2DcdAbW-1639027996653)(springcloudalibaba项目.assets/image-20211209003355832.png)]
过滤器有优先级之分,Order越大 优先级越来越低,越晚被执行
全局过滤器 所有的请求都会执行
局部过滤器只有配置了对应请求才会执行