@[toc]
我们都知道,SpringCloud是微服务的一站式解决方案,是众多组件的集合,而因为SpringCloud中几乎所有的组件使用的都是Netflix公司的产品,其中大部分已经进入了停止更新或者维护阶段。我们需要一些别的组件来代替它们,基于此,SpringCloud Alibaba诞生了。
本篇文章我们通过几个具体的业务场景,将SpringCloud Aibaba技术栈融入其中,来感受一下它的便利与强大。
环境搭建
创建父项目,修改pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wwj</groupId>
<artifactId>springcloud-alibaba</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>shop-common</module>
<module>shop-user</module>
<module>shop-product</module>
<module>shop-order</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
这里需要创建四个微服务,我们以创建用户服务为例,其它三个服务的创建流程都一样,首先添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.13</version>
</dependency>
<dependency>
<groupId>com.wwj</groupId>
<artifactId>shop-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
创建配置文件:
server:
port: 8071
spring:
application:
name: service-user
datasource:
url: jdbc:mysql:///springcloudalibaba?serviceTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 输出sql
创建Controller:
@RestController
public class UserController {
}
创建Mapper:
public interface UserMapper extends BaseMapper<User> {
}
创建Service:
@Service
public class UserServiceImpl implements UserService {
}
创建启动类:
@SpringBootApplication
@MapperScan("com.wwj.mapper")
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
最后创建数据表:
CREATE DATABASE springcloudalibaba;
USE springcloudalibaba;
DROP TABLE IF EXISTS shop_order;
CREATE TABLE shop_order (
`oid` int(11) NOT NULL,
`uid` int(11) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`pid` int(11) DEFAULT NULL,
`pname` varchar(255) DEFAULT NULL,
`pprice` double DEFAULT NULL,
`number` int(11) DEFAULT NULL,
PRIMARY KEY (`oid`)
) ENGINE=InnoDB DEFAULT CHARSET=gbk;
DROP TABLE IF EXISTS `shop_product`;
CREATE TABLE `shop_product` (
`id` int(11) NOT NULL,
`pname` varchar(255) DEFAULT NULL,
`pprice` double DEFAULT NULL,
`stock` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=gbk;
DROP TABLE IF EXISTS `shop_user`;
CREATE TABLE `shop_user` (
`uid` int(11) NOT NULL,
`username` varchar(255) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
`telephone` varchar(255) DEFAULT NULL,
PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=gbk;
将实体类创建在shop-common服务中,项目结构如下:
服务调用--RestTemplate
下面模拟一个用户下订单的业务,首先插入测试数据:
insert into shop_product values(null,'小米','1000','5000');
insert into shop_product values(null,'华为','2000','5000');
insert into shop_product values(null,'苹果','3000','5000');
insert into shop_product values(null,'OPPO','4000','5000');
编写Service:
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Override
public Product queryProductByPid(Integer pid) {
return productMapper.selectById(pid);
}
}
编写Controller:
@RestController
@Slf4j
public class ProductController {
@Autowired
private ProductService productService;
@RequestMapping("/product/{pid}")
public Product product(@PathVariable("pid") Integer pid) {
log.info("调用商品服务,查询{}号商品", pid);
Product product = productService.queryProductByPid(pid);
log.info("查询成功,商品信息:{}", JSON.toJSONString(product));
return product;
}
}
启动微服务,访问http://localhost:8081/product/1:
这样商品服务就编写好了,接下来编写订单服务,首先仍然是Service:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Override
public void createOrder(Order order) {
orderMapper.insert(order);
}
}
因为订单服务和商品服务是两个独立的模块,它们之间无法直接相互调用,所以我们这里使用RestTemplate进行调用,在容器中注册一下:
@Configuration
public class MyConfig {
@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
然后编写Controller:
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderService orderService;
@RequestMapping("/order/product/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info("调用订单服务,商品编号:{}", pid);
Product product = restTemplate.getForObject("http://localhost:8081/product/" + pid, Product.class);
// 创建订单
Order order = new Order();
// 模拟数据
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrder(order);
log.info("订单创建成功,订单信息:{}", JSON.toJSONString(order));
return order;
}
}
启动该服务,访问http://localhost:8091/order/product/1:
功能没有问题,但问题是显而易见的,使用RestTemplate调用时,对于需要调用的服务地址是写死在程序中的,即使抽取成公共变量,在服务修改地址或服务名时,仍然需要在程序中进行修改,而且微服务往往需要做集群,鉴于这些场景,我们需要用到 服务治理
。
服务治理--Nacos
服务治理是微服务架构中最核心最基本的模块,用于实现各个微服务的自动化注册和发现。
通常情况下所有服务都由一个服务注册中心进行管理,它提供服务注册、服务发现和服务剔除的功能,这里我们使用Nacos作为服务注册中心进行开发。
Nacos是阿里巴巴开源的一款产品,它是一个可执行的文件,下面我们就来安装一下,下载地址:http://github.com/alibaba/nacos/releases
下载完成后解压缩,会得到一个文件夹:
执行bin目录下的startup.cmd即可启动Nacos:
这样就表示启动成功了,然后访问http://localhost:8848/nacos/ 即可进入Nacos的后台管理页面:
用户名和密码默认均为nacos,点击登录进入系统:
该系统中的相关操作后面都会有所介绍。
启动了Nacos之后,我们需要将将刚才的服务注册到Nacos中,首先引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
添加配置:
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
最后在启动类上声明@EnableDiscoveryClient注解,启动服务,查看Nacos后台:
点击左侧的服务列表,当看到service-product服务时说明注册成功了,以同样的方式将订单服务也注册进Nacos:
服务注册成功之后,我们的调用过程就需要修改了:
@Autowired
private DiscoveryClient discoveryClient;
@RequestMapping("/order/product/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info("调用订单服务,商品编号:{}", pid);
// 获取服务集合
List<ServiceInstance> instances = discoveryClient.getInstances("service-product");
// 取出第一个服务实例
ServiceInstance serviceInstance = instances.get(0);
String host = serviceInstance.getHost(); // 获取服务ip
int port = serviceInstance.getPort(); // 获取服务端口
Product product = restTemplate.getForObject("http://" + host + ":" + port + "/product/" + pid, Product.class);
// 创建订单
Order order = new Order();
// 模拟数据
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrder(order);
log.info("订单创建成功,订单信息:{}", JSON.toJSONString(order));
return order;
}
注入DiscoveryClient对象,并通过它获取服务实例,进而获取服务ip和端口号进行调用。
负载均衡--Ribbon
为了保证系统的高可用,我们往往需要搭建服务集群,然而用户请求如何被分摊到集群中的各个服务上呢?
我们直接通过idea快速创建一个新的商品服务模拟集群环境:
启动该服务:
下面我们就来实现一下负载均衡:
// 随机取出服务实例
int index = new Random().nextInt(instances.size());
ServiceInstance serviceInstance = instances.get(index);
实现非常简单,只需要在服务实例集合大小中随机出一个数作为索引,然后取出该服务实例即可。
我们自定义的负载策略是非常局限的,为此,需要一个开源的组件来帮助实现负载均衡,它就是 Ribbon
。
首先在注册RestTemplate时添加一个@LoadBalanced注解:
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
然后调用方式又发生了变化:
Product product = restTemplate.getForObject("http://service-product/product/" + pid, Product.class);
ip和端口号统统不需要写了,只需要写上服务名即可,Ribbon会自动发现服务并进行调用,默认采用的是轮询策略,即:依次将请求分摊给每个服务,让每个服务处理的请求数基本持平。
除了轮询策略,Ribbon还提供了丰富的负载策略:
- BestAvailabl:选择一个最小的并发请求的 Server,逐个考察 Server,如果 Server 被标记为错误,则跳过,然后再选择 ActiveRequestCount 中最小的 Server
- AvailabilityFilteringRule:过滤掉那些一直连接失败的且被标记为 circuit tripped 的后端 Server,并过滤掉那些高并发的后端 Server 或者使用一个 AvailabilityPredicate 来包含过滤 Server 的逻辑。其实就是检查 Status 里记录的各个 Server 的运行状态
- ZoneAvoidanceRule:使用 ZoneAvoidancePredicate 和 AvailabilityPredicate 来判断是否选择某个 Server,前一个判断判定一个 Zone 的运行性能是否可用,剔除不可用的 Zone(的所有 Server),AvailabilityPredicate 用于过滤掉连接数过多的 Server
- RandomRule:随机选择一个 Server
- RoundRobinRule:轮询选择,轮询 index,选择 index 对应位置的 Server
- RetryRule:对选定的负载均衡策略机上重试机制,也就是说当选定了某个策略进行请求负载时在一个配置时间段内若选择 Server 不成功,则一直尝试使用 subRule 的方式选择一个可用的 Server
- ResponseTimeWeightedRule:作用同 WeightedResponseTimeRule,ResponseTime-Weighted Rule 后来改名为 WeightedResponseTimeRule
- WeightedResponseTimeRule:根据响应时间分配一个 Weight(权重),响应时间越长,Weight 越小,被选中的可能性越低
这里以随机负载策略为例,在主启动类的上层新建一个包,并创建配置类:
@Configuration
public class MyRule {
@Bean
public IRule iRule(){
return new RandomRule();
}
}
在主启动类声明@RibbonClient注解:
@EnableDiscoveryClient
@RibbonClient(name = "service-product",configuration = MyRule.class)
@SpringBootApplication
@MapperScan("com.wwj.mapper")
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
其中name为需要负载的服务名,configuration为规则配置类。
服务调用--OpenFeign
使用Ribbon进行服务调用有诸多缺点,虽然服务注册到Nacos之后,我们无需再关心调用服务的ip和端口号,但地址参数等内容仍然需要做拼接,而且风格与我们的项目格格不入,为此,我们可以使用SpringCloud提供的另一个组件, OpenFeign
,通过OpenFeign,我们在调用远程服务时会像调用本地服务那样简单。
首先加入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
然后定义一个接口:
@FeignClient("service-product")
public interface ProductService {
@RequestMapping("/product/{pid}")
Product product(@PathVariable("pid") Integer pid);
}
其中@FeignClient注解用于声明这是一个远程服务接口,值为需要远程调用的服务名,然后在接口中定义方法,一般与远程服务提供的方法声明一致:
@RequestMapping("/product/{pid}")
public Product product(@PathVariable("pid") Integer pid) {
log.info("调用商品服务,查询{}号商品", pid);
Product product = productService.queryProductByPid(pid);
log.info("查询成功,商品信息:{}", JSON.toJSONString(product));
return product;
}
声明好接口后就可以直接注入该接口并调用方法:
@Autowired
private ProductService productService;
Product product = productService.product(pid);
最后在启动类上声明@EnableFeignClients注解即可。
服务容错--Sentinel
在微服务架构中,我们将业务拆分成了一个一个的服务,服务与服务之间可以相互调用,但是由于网络或者自身的原因,服务并不能保证100%可用,如果单个服务出现问题,调用这个服务就会出现延迟,此时若有大量的网络涌入,会形成任务堆积,最终导致服务瘫痪。
对于这些问题,业内提供了一系列的容错解决方案,下面是一些较为常用的:
- 隔离:它是指将系统按照一定的原则划分为若干个服务模块,各个模块之间相互独立,无强依赖;当有故障发生时,能将问题和影响隔离在某个模块内部,而无扩散风险,不涉及其它模块,不影响整体的系统服务;常见的隔离方式有:线程池隔离和信号量隔离
- 超时:在上游服务调用下游服务时,设置一个最大响应时间,如果超过这个时间,下游未作出反应,就断开请求,释放线程
- 限流:限流就是限制系统的输入和输出流量已达到保护系统的目的,为了保证系统的稳固运行,一旦达到需要限制的阈值,就需要限制流量并采取少量措施以完成限制流量的目的
熔断:当下游服务因访问压力过大而响应变慢或失败,上游服务为了保护系统整体的可用性,可以暂时切断对下游服务的调用,这种牺牲局部,保全整体的措施就叫熔断,熔断一般有三种状态:
- 熔断关闭状态:服务没有故障时熔断器所处的状态,对调用方的调用不作任何限制
- 熔断开启状态:后续对该服务接口的调用不再经过网络,直接执行本地的fallback方法
- 半熔断状态:尝试恢复服务调用,允许有限的流量调用该服务,并监控调用成功率,如果成功率达到预期,则说明服务已恢复,进入熔断关闭状态;如果成功率仍旧很低,则重新进入熔断关闭状态
- 降级:降级就是为服务提供一个拖底方案,一旦服务无法正常调用,就使用托底方案
这里我们使用阿里巴巴开源的Sentinel组件来实现服务容错,Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
首先我们需要下载Sentinel,地址:https://github.com/alibaba/Sentinel/releases
Sentinel提供了一个独立的后台系统,用于设置和监控服务信息,下载得到sentinel的jar包,它是一个SpringBoot开发的应用,所以我们可以直接启动它:
java -jar sentinel-dashboard-1.8.1.jar
端口号默认是8080,访问 http://localhost:8080/:
用户名和密码默认都是sentinel,接下来我们要将服务接入到Sentinel中,首先在订单服务中引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
配置一下:
spring:
cloud:
sentinel:
transport:
port: 9999 # 指定与Sentinel交互的端口号
dashboard: 127.0.0.1:8080 # 指定Sentinel后台系统地址
其中port是与Sentinel交互的端口号,因为Sentinel可能有数据需要传递给服务,所以需要给Sentinel一个端口,该端口只要未被占用即可;dashboard用于指定Sentinel后台系统的地址。
然后编写两个控制方法进行测试:
@RequestMapping("/order/message")
public String message() {
return "message";
}
@RequestMapping("/order/message2")
public String message2() {
return "message2";
}
Sentinel是懒加载机制,所以要先访问一下这两个控制方法,然后查看后台监控:
现在对Sentinel的使用进行一个简单的入门:
点击左侧的簇点链路,然后对/order/message资源进行流控,流控规则设置如下:
此时若是每秒访问该资源超过两次,则直接会被控制,看效果:
接下来我们将分别介绍Sentinel中的各项配置。
流控规则
流量控制,其原理监控应用流量的QPS或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
设置方法是点击左侧菜单的簇点链路,然后点击需要设置的资源上的流控按钮:
弹出提示框:
其中资源名用于设置一个唯一的资源名称,默认值为资源访问路径,也可以设置成别的值;针对来源用于设置针对哪个微服务进行限流,默认为default,意思是不区分来源,全部限制;阈值类型用于指定限制条件,比如QPS为根据每秒请求数进行限流,线程数为根据最大并发线程数进行限制;单击阈值设置的就是前面阈值类型的最大值。
在前面我们已经体会到了QPS的效果,那么来看看线程数的控制效果是什么样的呢?
将线程数阈值设置为2,此时表示当前只允许两个线程并发访问,我们可以使用jmeter工具进行测试,首先新建拥有两个线程的线程组:
然后设置一下请求信息:
运行之后,再到浏览器上访问,会被直接控制:
流控规则里还能够设置一些高级功能,以QPS为例:
高级选项中能够设置流控模式和流控效果,我们之前看到的效果就是直接模式的快速失败效果,接下来我们测试一下关联模式,首先配置如下:
这样配置以后,当关联资源超过了上面配置的阈值,即每秒允许在最大请求数为2后,会导致当前资源被控制,它可以运用在订单和支付服务之间,因为支付对安全性要求很高,所以我们可以设置一个规则,当支付服务每秒请求数超过某个值之后就控制订单服务,让用户无法继续下订单,以保证支付服务的可用性。
第三种模式是链路模式,它是针对来源更加细粒度的操作,针对来源可以配置对哪个微服务限流,而链路则可以配置针对哪个接口进行限制,那么首先就添加一个Service方法:
@Service
public class OrderMessageService {
/**
* 定义资源
* @return
*/
@SentinelResource("message")
public String message(){
return "message";
}
}
在方法上添加@SentinelResource注解将其标注为一个资源,注解值为该资源的资源名,然后我们需要在控制方法中调用它:
@RestController
public class OrderController2 {
@Autowired
private OrderMessageService orderMessageService;
@RequestMapping("/order/message")
public String message() {
orderMessageService.message();
return "message";
}
@RequestMapping("/order/message2")
public String message2() {
orderMessageService.message();
return "message2";
}
}
现在的情况是这两个控制方法均调用了Service方法,所以我们可以为Service资源设置一个链路规则,规定 /order/message
这条链路的QPS不能大于2,否则进行控制,同样点击菜单左侧的簇点链路,然后对message资源进行流控:
配置如下:
最后不要忘记在配置文件中进行一项配置,否则链路模式的流控是不生效的:
spring:
cloud:
sentinel:
web-context-unify: false
效果如下:
我们已经了解高级选项中的流控模式,接下来看看流控效果:
其中快速失败指的是当请求数或线程数超过阈值,则直接抛出异常;
Warm Up指的是预热,它从开始阈值到最大阈值这段时间会有一个缓冲阶段,一开始的阈值是最大QPS阈值的1/3,然后慢慢增大,直到最大阈值,适用于将突然增大的流量转换为缓步增长的场景;
而排队等待是让请求以均匀的速度通过,单机阈值为每秒通过数量,其余的排队等待,排队等待还需要设置一个超时时间,当请求超过设置的超时时间后还未被处理,则该请求会被丢弃。
降级规则
降级规则就是设置当满足什么条件时,对服务进行降级处理,同样点击左侧菜单的簇点链路,然后点击对应资源的降级按钮:
来看一看降级规则有哪些需要设置的内容:
Sentinel提供了三种熔断策略,我们一一来体会,首先是慢调用比例,它选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
比如这样的配置:
它表示统计时长,也就是1秒内请求数大于1,且慢调用比例大于0.5(这里只要请求的响应时间大于5毫秒则认为是慢调用),若是这两个条件都满足,则熔断降级5秒,5秒过后,探测器会进入探测状态,若接下来的一个请求响应时间小于5毫秒则结束熔断,若大于5毫秒,则再次熔断。
需要注意,Sentinel默认统计的RT上限是4900毫秒,超出此阈值的数据都会被认为是4900毫秒,若确实需要设置比上限还大的RT,则可以通过启动Sentinel时携带参数来实现:
-Dcsp.sentinel.statistic.max.rt=xxx
第二种策略是异常比例,当单位统计时长内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0]
,代表 0% - 100%。
比如这样的配置:
它表示的是在1秒内若是请求数大于1,并且异常比例大于25%,也就是说4次请求中有多于1次的请求抛出了异常,则这个异常比例就大于25%了,此时请求会被熔断5秒,5秒后探测器进入探测状态,若是接下来的一个请求成功完成则结束熔断,否则再次熔断。
最后一种策略是异常数,当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。
比如:
它表示1秒内请求数大于1,并且异常数也大于1,则熔断5秒,5秒后探测器进入探测状态,若接下来的一个请求成功完成,则结束熔断,否则再次熔断。
热点规则
热点规则是一种更细粒度的流控规则,它允许将规则具体到参数上。
编写代码:
@RequestMapping("/order/message3")
@SentinelResource("message3")
public String message3(String name, Integer age) {
return "message3:" + name + "," + age;
}
现有一个控制方法接收两个参数,我们使用@SentinelResource注解将其声明为一个资源,然后在Sentinel控制台设置一下热点规则:
一定要注意应该设置在哪个资源上,因为注解值为message3,所以应该对message3资源进行设置:
参数索引指的是方法中的参数,0为第一个参数,1为第二个参数,对索引为0的参数进行热点规则设置,此时携带该参数的请求在1秒内若是多于3个,则直接控制。
在底下还能够设置参数例外项,这里的意思是设置携带参数age的请求在1秒内不能多于3个,否则直接控制;但若是age值为20,则限流阈值会被指定为1000,即:1秒内不能多于1000个请求。
授权规则
很多时候,我们需要根据调用来源判断该次请求是否允许放行,这时候可以使用Sentinel的来源访问控制的功能,来源访问控制根据资源的请求来源限制资源是否能够通过。
授权规则是需要通过一个标识来匹配来源是否合法的,所以我们需要编写代码:
@Component
public class RequestOriginParserDefinition implements RequestOriginParser {
/**
* 定义区分来源
*
* @param request
* @return
*/
@Override
public String parseOrigin(HttpServletRequest request) {
String serviceName = request.getParameter("serviceName");
if (StringUtils.isEmpty(serviceName)) {
throw new RuntimeException("serviceName is not empty");
}
return serviceName;
}
}
该方法将会从请求域中获取一个serviceName属性值,并将它返回给Sentinel进行匹配,所以我们要在Sentinel中进行授权规则的设置:
授权规则采用两种形式,白名单和黑名单,若是白名单,则除了当前配置的应用外,其它应用都将被控制;若是黑名单,则相反。
这里我们设置流控应用为A,类型为白名单,则携带serviceName=A属性的请求不会被限制,除此之外的请求都将被控制:
自定义异常返回页面
在前面的各个规则中,我们发现当资源被控制后,显示的错误页面都是一致的,这样我们很难区分究竟是什么问题,对于用户这样的页面也不友好,为此,我们需要自定义异常的返回页面,方式如下:
@Component
public class ExceptionHandlerPage implements BlockExceptionHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
response.setContentType("application/json;charset=utf-8");
ResponseData responseData = null;
if (e instanceof FlowException) {
// 如果是限流异常
responseData = new ResponseData(-1, "限流异常...");
} else if (e instanceof DegradeException) {
// 如果是降级异常
responseData = new ResponseData(-2, "降级异常...");
} else if (e instanceof ParamFlowException) {
// 如果是参数限流异常
responseData = new ResponseData(-3, "参数限流异常");
} else if (e instanceof AuthorityException) {
responseData = new ResponseData(-4, "授权异常");
} else if (e instanceof SystemBlockException) {
responseData = new ResponseData(-5, "系统负载异常");
}
response.getWriter().write(JSON.toJSONString(responseData));
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class ResponseData {
private int code;
private String message;
}
可以对每个异常进行定制化处理。
@SentinelResource
前面我们已经使用到了@SentinelResource注解,它的作用是声明一个资源,但它的作用远不止如此,它可以定义当资源内部发生异常时的处理逻辑:
@Service
@Slf4j
public class OrderMessageService {
@SentinelResource(value = "message", blockHandler = "blockHandler", fallback = "fallback")
public String message(String name) {
return "message";
}
public String blockHandler(String name, BlockException e) {
log.error("BlockException...{}", e);
return "BlockException";
}
public String fallback(String name, Throwable t) {
log.error("Throwable...{}", t);
return "Throwable";
}
}
@SentineResource注解能够指定两个参数用来处理出异常后的逻辑,blockHandler和fallback。
其中blockHandler捕获的是BlockException,该异常是流控异常、降级异常、热点异常等五个异常的父类,也就是说,Sentinel抛出的异常都会被它捕获,该属性需要指定一个处理方法,该方法的声明也有讲究,返回值和参数必须和原资源方法的声明一致,但可以再多声明一个BlockException参数用于获取异常信息。
而fallback的作用与blockHandler类似,但它的作用域更广,对于资源产生的任何异常,都会被fallback捕获到并进行处理,现在我们对/order/message资源进行流控,则发生流控异常后便会进入我们自定义的处理方法:
若是同时定义这两个属性,则对于Sentinel抛出的异常,blockHandler会优先处理。
若是不想让异常处理方法和业务写在一起,我们也可以使用另外一个属性blockHandlerClass将异常处理抽取到外面去:
@SentinelResource(
value = "message",
blockHandlerClass = OrderMessageBlockHandler.class,
blockHandler = "blockHandler",
fallback = "fallback")
public String message(String name) {
return "message";
}
此时在外部定义OrderMessageBlockHandler类:
@Slf4j
public class OrderMessageBlockHandler {
public static String blockHandler(String name, BlockException e) {
log.error("BlockException...{}", e);
return "BlockException";
}
}
要注意此时的异常处理方法必须声明为静态的,fallback也可以如此方式进行处理。
Sentinel持久化
到这里关于Sentinel的各种规则就介绍完了,但是我们仍然有一个问题没有解决,就是每次设置的规则在应用启动后就失效了,需要重新设置,这是因为Sentinel默认将规则设置保存在了内存中,所以要是想将规则保存下来,我们就需要对其进行持久化。
编写一个类:
package com.wwj.config;
import com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler;
import com.alibaba.csp.sentinel.datasource.*;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.springframework.beans.factory.annotation.Value;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class FilePersistence implements InitFunc {
@Value("spring.application.name")
private String applicationName;
@Override
public void init() throws Exception {
String ruleDir = System.getProperty("user.home") + "/sentinel-rules/" + applicationName;
System.out.println(ruleDir);
String flowRulePath = ruleDir + "/flow-rule.json";
String degradeRulePath = ruleDir + "degrade-rule.json";
String systemRulePath = ruleDir + "system-rule.json";
String authorityRulePath = ruleDir + "authority-rule.json";
String paramFlowRulePath = ruleDir + "param-flow-rule.json";
this.mkdirIfNotExists(ruleDir);
this.createFileIfNotExists(flowRulePath);
this.createFileIfNotExists(degradeRulePath);
this.createFileIfNotExists(systemRulePath);
this.createFileIfNotExists(authorityRulePath);
this.createFileIfNotExists(paramFlowRulePath);
// 流控规则
ReadableDataSource<String, List<FlowRule>> flowRuleRDS = new FileRefreshableDataSource<List<FlowRule>>(
flowRulePath,
flowRuleListParser
);
FlowRuleManager.register2Property(flowRuleRDS.getProperty());
WritableDataSource<List<FlowRule>> flowRuleWDS = new FileWritableDataSource<List<FlowRule>>(
flowRulePath,
this::encodeJson
);
WritableDataSourceRegistry.registerFlowDataSource(flowRuleWDS);
// 降级规则
ReadableDataSource<String, List<DegradeRule>> degradeRuleRDS = new FileRefreshableDataSource<List<DegradeRule>>(
degradeRulePath,
degradeRuleListParser
);
DegradeRuleManager.register2Property(degradeRuleRDS.getProperty());
WritableDataSource<List<DegradeRule>> degradeRuleWDS = new FileWritableDataSource<List<DegradeRule>>(
degradeRulePath,
this::encodeJson
);
WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWDS);
// 系统规则
ReadableDataSource<String, List<SystemRule>> systemRuleRDS = new FileRefreshableDataSource<>(
systemRulePath,
systemRuleListParser
);
SystemRuleManager.register2Property(systemRuleRDS.getProperty());
WritableDataSource<List<SystemRule>> systemRuleWDS = new FileWritableDataSource<List<SystemRule>>(
systemRulePath,
this::encodeJson
);
WritableDataSourceRegistry.registerSystemDataSource(systemRuleWDS);
// 授权规则
ReadableDataSource<String, List<AuthorityRule>> authorityRuleRDS = new FileRefreshableDataSource<List<AuthorityRule>>(
authorityRulePath,
authorityRuleListParser
);
AuthorityRuleManager.register2Property(authorityRuleRDS.getProperty());
WritableDataSource<List<AuthorityRule>> authorityRuleWDS = new FileWritableDataSource<List<AuthorityRule>>(
authorityRulePath,
this::encodeJson
);
WritableDataSourceRegistry.registerAuthorityDataSource(authorityRuleWDS);
// 热点规则
ReadableDataSource<String, List<ParamFlowRule>> paramFlowRuleRDS = new FileRefreshableDataSource<List<ParamFlowRule>>(
paramFlowRulePath,
paramFlowRuleListParser
);
AuthorityRuleManager.register2Property(authorityRuleRDS.getProperty());
WritableDataSource<List<ParamFlowRule>> paramFlowRuleWDS = new FileWritableDataSource<List<ParamFlowRule>>(
paramFlowRulePath,
this::encodeJson
);
ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWDS);
}
private Converter<String, List<FlowRule>> flowRuleListParser = source -> JSON.parseObject(
source,
new TypeReference<List<FlowRule>>() {
}
);
private Converter<String, List<DegradeRule>> degradeRuleListParser = source -> JSON.parseObject(
source,
new TypeReference<List<DegradeRule>>() {
}
);
private Converter<String, List<SystemRule>> systemRuleListParser = source -> JSON.parseObject(
source,
new TypeReference<List<SystemRule>>() {
}
);
private Converter<String, List<AuthorityRule>> authorityRuleListParser = source -> JSON.parseObject(
source,
new TypeReference<List<AuthorityRule>>() {
}
);
private Converter<String, List<ParamFlowRule>> paramFlowRuleListParser = source -> JSON.parseObject(
source,
new TypeReference<List<ParamFlowRule>>() {
}
);
private void mkdirIfNotExists(String filePath) throws IOException {
File file = new File(filePath);
if (!file.exists()) {
file.mkdirs();
}
}
private void createFileIfNotExists(String filePath) throws IOException {
File file = new File(filePath);
if (!file.exists()) {
file.createNewFile();
}
}
private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
然后需要在resource目录下创建一个文件夹,名字必须为 META-INF.services
,并在该文件夹下新建一个文件,名字必须为 com.alibaba.csp.sentinel.init.initFunc
,在文件中写上持久化类的全类名:
com.wwj.config.FilePersistence
最后重新启动服务,然后设置一个流控规则,该规则就会被持久化到本地磁盘上:
OpenFeign整合Sentinel
接下来我们通过OpenFeign整合Sentinel来实现服务容错,首先开启OpenFeign对Sentinel的支持:
feign:
sentinel:
enabled: true
还记得我们在前面写的OpenFeign接口吗?若是想实现远程调用,我们需要编写一个接口,并声明方法:
@FeignClient(value = "service-product", fallback = ProductServiceFallback.class)
public interface ProductService {
@RequestMapping("/product/{pid}")
Product product(@PathVariable("pid") Integer pid);
}
一般该方法的声明与要调用的远程方法声明相同,若想实现对该方法的容错,我们首先需要在@FeignClient注解中添加fallback属性,它将用于指定发生容错后进行的处理,所以创建ProductServiceFallback类:
/**
* 这是一个容错类,需要实现Feign所在的接口,并实现接口中的所有方法
*/
@Service
public class ProductServiceFallback implements ProductService {
@Override
public Product product(Integer pid) {
Product product = new Product();
product.setPid(-1);
product.setPname("远程调用失败,执行容错方法");
return product;
}
}
该类需要实现Feign声明的接口,并实现接口中的所有方法,当Feign接口远程调用失败时,就会在该类中寻找与调用的Feign接口中同名的方法进行调用,此时在调用远程接口时就可以知晓是否调用失败并做对应的处理:
@RequestMapping("/order/product/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info("调用订单服务,商品编号:{}", pid);
Product product = productService.product(pid);
if (product.getPid() == -1) {
// 此时说明远程调用失败,执行了容错处理
Order order = new Order();
order.setOid(-1);
order.setPname("下单失败");
return order;
}
// 创建订单
Order order = new Order();
// 模拟数据
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrder(order);
log.info("订单创建成功,订单信息:{}", JSON.toJSONString(order));
return order;
}
测试一下:
这里因为自身的网络问题,第一次下单就失败了,也正好测试出了远程服务调用失败的效果。
然而它也有一定的缺点,即使远程调用失败了,控制台也不会输出任何错误信息的日志,这对于我们的错误排查是非常不利的,为此,我们可以使用另外一种容错方式:
@Service
@Slf4j
public class ProductServiceFallbackFactory implements FallbackFactory<ProductService> {
@Override
public ProductService create(Throwable throwable) {
return new ProductService() {
@Override
public Product product(Integer pid) {
log.error("{}", throwable);
Product product = new Product();
product.setPid(-1);
product.setPname("远程调用失败,执行容错方法");
return product;
}
};
}
}
该方法会携带一个异常信息,我们将其进行输出即可,然后修改一下远程接口:
@FeignClient(value = "service-product", fallbackFactory = ProductServiceFallbackFactory.class)
public interface ProductService {
@RequestMapping("/product/{pid}")
Product product(@PathVariable("pid") Integer pid);
}
设置fallbackFactory属性即可,此时当调用失败后,控制台便会输出错误日志:
服务网关--Gateway
当客户端访问这些微服务时,必须要知道服务的ip和端口,然而服务往往是有集群的,也就是说,一个服务一般有很多个一起提供服务,客户端要是想实现高可用,就必须知晓每个服务的ip和端口;再者,当实现一些权限认证的功能时,我们需要对每个涉及到的服务都做一套相同的逻辑,这些问题都等待着我们去解决,那么有没有一个好的解决方案呢,当然是有的,它就是服务网关:Gateway。
服务网关,就是指系统的统一入口,它封装了应用程序的内部结构,为用户提供统一服务,一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等。
首先创建一个网关服务,并引入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
因为Gateway的实现依赖于Netty和WebFlux,所以我们不能在该服务中引入 spring-boot-starter-web
的依赖,该依赖中包含了Tomcat,会影响到网关的功能。
进行配置:
server:
port: 7000
spring:
application:
name: shop-gateway
cloud:
gateway:
routes: # 路由集合
- id: product_route # 当前路由的唯一标识
uri: http://localhost:8081 # 请求最终要转发到的地址
order: 1 # 路由的优先级,数字越小优先级越高
predicates: # 断言,转发请求要满足的条件
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,此路由信息才会正常转发
filters: # 过滤器,在请求传递过程中对请求进行处理
- StripPrefix=1 # 在请求转发之前去掉一层路径
这里重点是对路由的配置,首先id是当前路由的唯一标识,uri是请求需要转发到的地址,这里我们设置了一种路由规则,即当地址为 http://localhost:7000/product-serv/**
时,会被该路由转发到商品服务,所以predicates需要判断当前请求是否以 product-serv
开头,如果满足条件,则证明需要访问的是商品服务,比如这样的一个url: http://localhost:7000/product-serv/product/1
,然而经过路由转发后的url却是这样的状态: http://localhost:8081/product-serv/product/1
,它比我们真实的服务地址要多了一层product-serv路径,所以我们在filters中将其去掉即可。
测试一下:
没有任何问题,我们访问的是网关,获取到了商品服务数据。
然而这里的路由配置是有问题的,因为uri属性值写死了,最终我们应该是从Nacos中获取服务信息而不是写死在程序里,那么该如何实现呢?
首先在网关服务中引入Nacos的依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
只需要进行一些简单的配置即可:
server:
port: 7000
spring:
application:
name: shop-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true # 让gateway可以发现nacos中的服务
routes: # 路由集合
- id: product_route # 当前路由的唯一标识
uri: lb://service-product # 从Nacos中获取指定名称的服务,并遵循负载均衡策略
order: 1 # 路由的优先级,数字越小优先级越高
predicates: # 断言,转发请求要满足的条件
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,此路由信息才会正常转发
filters: # 过滤器,在请求传递过程中对请求进行处理
- StripPrefix=1 # 在请求转发之前去掉一层路径
内置路由断言
Gateway提供了多种断言工厂用于路由匹配,比如刚才使用的 Path
,它能够对请求路径进行匹配,还有一些其它类型的路由工厂,详细如下:
- 基于Datetime类型的断言工厂
此类型的断言根据时间判断,主要有三个:
AfterRoutePredicateFactory:接收一个日期参数,判断请求日期是否晚于指定日期
BeforeRoutePredicateFactory:接收一个日期参数,判断请求日期是否早于指定日期
BetweenRoutePredicateFactory:接收两个日期参数,判断请求日期是否在指定时间段内
格式为:
-After=2021-03-21T23:59:59.789+08:00[Asia/Shanghai]
- 基于远程地址的断言工厂
RemoteAddrRoutePredicateFactory:接收一个IP地址段,判断请求主机地址是否在地址段中
格式为:
-RemoteAddr=192.168.147.1/24
- 基于Cookie的断言工厂
CookieRoutePredicateFactory:接收两个参数,Cookie名字和正则表达式,判断请求Cookie是否具有给定名称且值是否与正则表达式匹配
格式为:
-Cookie=chocolate,ch
- 基于Header的断言工厂
HeaderRoutePredicateFactory:接收两个参数,标题名称和正则表达式,判断请求Header是否具有给定名称且值是否与正则表达式匹配
格式为:
-Header=X-Request-Id,\d+
- 基于Host的断言工厂
HostRoutePredicateFactory:接收一个参数,主机名模式,判断请求的Host是否满足匹配规则
格式为:
-Host=**.testhost.org
- 基于Method请求方法的断言工厂
MethodRoutePredicateFactory:接收一个参数,判断请求的类型是否跟指定的类型匹配
格式为:
-Method=GET
- 基于Path请求路径的断言工厂
PathRoutePredicateFactory:接收一个参数,判断请求的URI部分是否满足路径规则
格式为:
-Path=/foo/(segment)
- 基于Query请求参数的断言工厂
QueryRoutePredicateFactory:接收两个参数,请求Param和正则表达式,判断请求参数是否具有给定名称且值是否与正则表达式匹配
格式为:
-Query=baz,ba
- 基于路由权重的断言工程
WeightRoutePredicateFactory:接收一个[组名,权重],然后对于同一个组内的路由按照权重转发
格式为:
routes:
-id:weight_route1
uri:host1
predicates:
-Path=/product/**
-Weight=group3,1
-id:weight_route2
uri:host2
predicates:
-Path=/product/**
-Weight=group3,9
自定义路由断言
对于特殊的业务场景,Gateway提供的断言往往无法直接解决我们的需求,所以我们需要自定义断言工厂,比如只让年龄在某个范围内的用户进行访问,那么首先需要进行配置:
spring:
cloud:
gateway:
routes: # 路由集合
- id: product_route
uri: lb://service-product
order: 1
predicates:
- Path=/product-serv/**
- Age=18,50
filters:
- StripPrefix=1
然后自定义断言工厂,实现断言方法:
@Component
public class AgeRoutePredicateFactory extends AbstractRoutePredicateFactory<AgeRoutePredicateFactory.Config> {
public AgeRoutePredicateFactory() {
super(AgeRoutePredicateFactory.Config.class);
}
/**
* 读取配置文件中的参数值,并将其赋值到配置类中
*
* @return
*/
@Override
public List<String> shortcutFieldOrder() {
// 该属性值的顺序必须和配置文件中的属性值一致
return Arrays.asList("minAge,maxAge");
}
@Override
public Predicate<ServerWebExchange> apply(AgeRoutePredicateFactory.Config config) {
return new Predicate<ServerWebExchange>() {
@Override
public boolean test(ServerWebExchange serverWebExchange) {
// 接收age参数
String ageStr = serverWebExchange.getRequest().getQueryParams().getFirst("age");
if (!StringUtils.isEmpty(ageStr)) {
int age = Integer.parseInt(ageStr);
if (age > config.getMinAge() && age < config.getMaxAge()) {
return true;
} else {
return false;
}
}
return false;
}
};
}
/**
* 配置类,用于接收配置文件中配置的参数
*/
@Data
public static class Config {
private int minAge;
private int maxAge;
}
}
需要注意的是,自定义的断言工厂命名格式必须为配置文件中的配置名(Age)+ RoutePredicateFactory。
网关限流
网关是所有请求的公共入口,所以可以在网关进行限流,而且限流的方式很多,我们可以采用Sentinel来实现网关的限流。
引入依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>
基于Sentinel的Gateway限流是通过其提供的Filter来完成的,使用时只需注入对应的SentinelGatewayFilter实例以及SentinelGatewayBlockExceptionHandler实例即可:
package com.wwj.config;
@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider
, ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
/**
* 初始化一个限流过滤器
*
* @return
*/
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
/**
* 配置初始化限流参数
*/
@PostConstruct
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("product_route") // 资源名称,对应的路由id
.setCount(1) // 限流阈值
.setIntervalSec(1) // 统计时间窗口,单位:秒
);
GatewayRuleManager.loadRules(rules);
}
/**
* 配置限流的异常处理器
*
* @return
*/
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}
/**
* 自定义限流异常页面
*/
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map<String, Object> map = new HashMap<>();
map.put("code", 0);
map.put("message", "接口被限流了...");
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}
该方式是针对某个路由进行的限流,限流的路由id为 product_route
。
服务配置--Nacos
在微服务架构中,一个系统往往由大量微服务支撑,而系统配置文件被分布在各个微服务内,管理非常不便,为此,我们需要使用服务配置中心来解决这一问题,阿里巴巴开源的Nacos组件不仅可以作为服务注册中心,还能作为服务配置中心统一管理配置。
引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
此时我们将商品微服务的配置文件删掉,并创建一个新的配置文件,名字必须为 bootstrap.yml
:
spring:
application:
name: service-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 # 配置中心地址
file-extension: yaml # 配置格式
profiles:
active: dev # 环境标识
然后点击Nacos后台的配置列表,并点击加号添加配置:
填写相关内容:
配置格式选择YAML,将配置写在下面,需要注意的是Data ID,它的命名遵循一个原则:服务名 + 环境标识 + 配置格式,所以这里的Data ID为 service-product-dev.yaml
。
运行商品服务进行测试:
从日志信息来看,配置是生效的。
现在的配置虽然生效了,但若是想要读取配置,当在Nacos后台修改了配置后,微服务并不能立马感知到,为此,Nacos提供了自动刷新功能,只需在控制器上声明@RefreshScope注解即可:
@RestController
@RefreshScope
public class NacosConfigConotroller {
@Value("${spring.application.name}")
private String name;
@GetMapping("/nacos-config")
public String nacosConfig() {
return name;
}
}
当配置越来越多的时候,我们就可以将重复的配置抽取出来,实现配置共享,比如:
只需将 微服务名.yaml
作为Data ID即可,这样它就会成为一个公共的配置,接下来就可以创建 service-product-dev.yaml
、 service-product-test.yaml
、 service-produt-prod.yaml
,它们都将共享 service-product.yaml
配置。
这种方式只适用于同一个微服务下的配置共享,若是想与其它微服务共享配置,则在Nacos后台创建一个公共的配置 all-service.yaml
,然后在需要使用该配置的微服务中进行配置:
spring:
application:
name: service-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 # 配置中心地址
file-extension: yaml # 文件格式
shared-dataids: all-service.yaml # 需要引入的公共配置ID
refreshable-dataids: all-service.yaml # 动态刷新指定配置
profiles:
active: dev # 环境标识
Nacos还为配置进行了进一步的划分:
- 命名空间
- 命名分组
- 配置集
通过它们,能够使各个微服务之间的配置更加清晰明了。
对于命名空间,一般用于区分不同环境下的配置,比如:
点击左侧菜单栏的命名空间,再点击右上角的新建命名空间即可添加命名空间,这里已经添加了三个命名空间,分别是测试、生产和开发环境,此时我们在新建配置的时候就可以选择命名空间:
此时微服务若是想读取该配置,就必须声明命名空间:
spring:
application:
name: service-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 # 配置中心地址
file-extension: yaml # 文件格式
namespace: 07099085-e140-4db7-b342-5450d6d934a0
namespace的值就是命名空间后面的那一段字符串:
而命名分组是将不同微服务的配置归类到同一个分组,比如将整个项目的所有微服务配置归属到一个分组中:
只需在新建配置时指定Group即可。
最后一个是配置集,配置集的意识实质上就是指一个配置文件,它已经是配置中划分最小的部分了。
它们的关系为:一个命名空间(环境)下有多个命名分组(项目),每个命名分组(项目)下又有多个配置集(配置文件)。
分布式事务
事务是一个大家都非常熟悉的话题了,我们直接来了解一下分布式架构中存在的事务问题。
以转账为例,对于本地事务来说,这一操作非常简单,减少张三的账户余额并增加李四的账户余额,将这两项操作用事务管理起来即可;然而分布式中存在这样的一个问题,张三账户和李四账户有可能并不在同一个数据库中,甚至不在同一个服务器上,对于这种情况,我们该如何保证事务操作呢?
对于分布式事务,其解决方案有四种:
- 全局事务
- 可靠消息服务
- 最大努力通知
- TCC事务
下面分别介绍:
全局事务
全局事务提出了三个角色:
- AP:Application 应用系统
- TM:Transaction Manager 事务管理器
- RM:Resource Manager 资源管理器
它将整个事务分成两个阶段:
- 表决阶段:所有参与者都将本地事务执行预提交,并将能否成功的信息反馈发给协调者
- 执行阶段:协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚
以下单流程为例,订单服务在生成订单的时候会先向事务管理器提供反馈,告诉他订单能够正常生成,而库存服务在减库存之前也会先向事务管理器提供反馈,当事务管理器接收到这两个微服务的正确反馈后,会通知这两个微服务同时执行刚才的操作,若是其中有一个微服务提供了错误的反馈,则通知微服务撤销刚才的操作即可。
这种方式只是降低了出现分布式事务问题的概率,并没有彻底地解决问题,因为在事务管理器通知微服务执行操作时仍然可能出现事务问题。
可靠消息服务
可靠消息服务是基于消息中间件实现的,具体方式如下:
- 在订单服务插入订单数据前,向消息中间件发送一条消息
- 消息中间件收到该条消息后将其持久化,但并不投递,当持久化成功后,向订单服务进行反馈
- 订单服务收到反馈后,开始插入订单数据
- 订单数据插入成功后,向消息中间件发送Commit或Rollback请求,该请求发送完成后,对订单服务的处理就结束了
如果消息中间件收到Commit请求,则向库存服务投递消息,如果收到的是Rollback请求,则直接丢弃消息,但如果这两种请求都未收到,那么就触发消息回查机制,主动调用订单服务提供的事务询问接口查询当前系统的状态,该接口可能返回三种结果,消息中间件将分别对这三种结果作出不同的处理:
- 提交:将消息投递给库存服务
- 回滚:将消息直接丢弃
- 处理中:继续等待
- 消息中间件向库存服务投递完消息后就会进入堵塞等待状态,库存服务便立即执行减库存的操作,操作完成后向消息中间件进行反馈
- 如果消息中间件收到正确反馈则认为事务处理完毕
- 如果消息中间件在等待确认反馈过程中超时则会重新投递消息,直到库存服务返回正确反馈为止
最大努力通知
最大努力通知其实是对可靠消息服务的进一步优化,它引入了本地消息表来记录错误消息,然后加入失败消息的定期校验,来进一步保证消息被下游服务消费。
流程如下:
- 处理业务的同一事务中,向本地消息表写入一条记录,也就是说,业务的处理和写入记录处在同一个事务中,当向本地消息表写入了记录后,就代表业务一定是处理成功的
- 准备专门的消息发送者不断地发送本地消息表中的消息给消息中间件,如果发生失败则重试
- 消息中间件收到消息后负责将该消息同步投递给相应的下游服务,并触发下游服务的业务处理
- 当下游服务处理成功后,向消息中间件进行反馈,消息中间件便可以将该条消息删除,从而该事务完成
- 对于投递失败的消息,利用重试机制进行重试,对于重试失败的,写入错误消息表
- 消息中间件需要提供失败消息的查询接口,下游服务会定期查询失败消息, 并将其消费掉
TCC事务
TCC事务属于补偿型的分布式事务,TCC实现分布式事务有三个步骤:
- 尝试待执行的业务:该过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源
- 确认执行业务:该过程不做任何业务检查,只使用第一步预留的业务资源。通常情况下,采用TCC则认为该阶段是不会出错的,也就是说,第一步只要成功了,那么这一步一定会成功,若是很不幸真的出错了,需要引入重试机制或人工进行干预
- 取消待执行的业务:取消第一步预留的业务资源,通常情况下,采用TCC则认为这一步也是不会出错的,若是真的出错了,则引入重试机制或人工干预
Seata
Seata是阿里巴巴开源的一个组件,它用于解决分布式事务问题,其愿景是让分布式事务的使用像本地事务一样简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。
这里仍然以下单操作模拟整个流程,首先编写控制方法:
@RestController
@Slf4j
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/order/product/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
return orderService.createOrder(pid);
}
}
在Service中提供该方法:
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public Order createOrder(Integer pid) {
log.info("调用订单服务,商品编号:{}", pid);
Product product = productService.product(pid);
// 创建订单
Order order = new Order();
// 模拟数据
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderMapper.insert(order);
log.info("订单创建成功,订单信息:{}", JSON.toJSONString(order));
// 减库存
productService.reduceInventory(pid, order.getNumber());
// 向RocketMQ投递一个下单成功的消息
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
}
该方法在向消息中间件投递消息前调用了远程商品服务实现了减库存操作,所以编写远程接口:
@FeignClient(value = "service-product")
public interface ProductService {
@RequestMapping("/product/{pid}")
Product product(@PathVariable("pid") Integer pid);
@RequestMapping("/product/reduceInventory")
void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number);
}
相应地,商品服务就需要提供该接口:
@RestController
@Slf4j
public class ProductController {
@Autowired
private ProductService productService;
@RequestMapping("/product/{pid}")
public Product product(@PathVariable("pid") Integer pid) {
log.info("调用商品服务,查询{}号商品", pid);
Product product = productService.queryProductByPid(pid);
log.info("查询成功,商品信息:{}", JSON.toJSONString(product));
return product;
}
@RequestMapping("/product/reduceInventory")
public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
productService.reduceInventory(pid, number);
}
}
这里调用了Service的方法,所以编写Service:
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Override
public Product queryProductByPid(Integer pid) {
return productMapper.selectById(pid);
}
@Override
public void reduceInventory(Integer pid, Integer number) {
Product product = productMapper.selectById(pid);
product.setStock(product.getStock() - number);
productMapper.insert(product);
}
}
这样整套流程就编写完成了,接下来制造一个异常来检验事务:
@Transactional
@Override
public void reduceInventory(Integer pid, Integer number) {
Product product = productMapper.selectById(pid);
product.setStock(product.getStock() - number);
// 模拟异常
int i = 1 / 0;
productMapper.insert(product);
}
在保存减库存操作之前产生了一个异常,此时执行下单操作就会出现订单创建成功而库存并没有减少的情况,这是一个非常危险的操作,想象一下,一个商家原本打算拿出100个手机用于促销活动,可因为异常情况导致的事务问题,使得用户能够疯狂下订单,而库存依然充足。
接下来我们就是用Seata来解决这一问题,首先下载Senta,下载地址:https://github.com/seata/seata/releases/download/v1.4.1/seata-server-1.4.1.zip
下载完成后解压即可得到Seata,首先修改conf目录下的file.conf文件:
然后修改registry.conf文件:
然后需要将Seata的配置导入到Nacos中,但是1.4.1版本的Seata是没有提供这样的脚本的,我们需要自己去获取脚本,下载地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh
还需要config.txt文件,这在1.4.1版本的Seata中也是没有的,下载地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt
将这里改成自己数据库的用户名和密码:
将这里原先的配置删掉,然后添加上这两项配置:
千万注意config.txt文件需要放在seata的根目录下,现在就可以将配置全部导入到Nacos中了,在conf目录下启动命令行窗口,执行指令:
nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 48285e4a-d213-47c1-81a3-b9ff577de2b8 -u nacos -w nacos
-h指定Nacos的ip,-p指定端口号,-g指定命名分组,-t指定命名空间,这里我创建了一个seata的命名空间:
-u和-w分别是登录Nacos的用户名和密码,当脚本执行完毕后,查看Nacos后台:
现在配置就全部导入进来了,紧接着就可以启动Seata了:
cd bin
seata-server.bat -p 9000 -m file
启动完成后,就可以在Nacos后台看到一个新的服务:
这样就表示启动成功了,之后我们要准备一下Seata需要使用到的数据表,因为Seata是通过记录事务日志来提交或回滚事务的,所以首先要创建一张日志表:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
将这张表创建在我们的业务数据库中,然后将下面这些表创建在seata数据库中:
-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);
-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);
-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
接下来就可以编写业务代码了,在订单服务和商品服务中引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
然后在两个微服务中都向容器里注册一个代理数据源:
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
接着在两个微服务的resource目录下都添加一个registry.conf文件:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
loadBalance = "RandomLoadBalance"
loadBalanceVirtualNodes = 10
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "public"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "public"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
}
创建bootstrap.yml文件:
spring:
application:
name: product-service
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
namespace: 48285e4a-d213-47c1-81a3-b9ff577de2b8 # seata命名空间标识
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: ${spring.application.name}
我们需要保证的是tx-service-group与config.txt中的配置一致:
到这里所有的准备工作就完成了,接下来若是想解决全局事务问题,只需要用到一个@GlobalTransactional 注解:
@GlobalTransactional // 全局事务控制
@Override
public Order createOrder(Integer pid) {
log.info("调用订单服务,商品编号:{}", pid);
Product product = productService.product(pid);
// 创建订单
Order order = new Order();
// 模拟数据
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderMapper.insert(order);
log.info("订单创建成功,订单信息:{}", JSON.toJSONString(order));
// 减库存
productService.reduceInventory(pid, order.getNumber());
// 向RocketMQ投递一个下单成功的消息
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
以上便是有关SpringCloud Alibaba技术栈的全部内容。