一、引子
抖音的推荐系统是其成功的关键之一,而背后是一套复杂的微服务架构支撑着高并发和庞大的用户数据处理。每当用户刷到新的视频时,背后都有一个复杂的推荐算法在实时运行。而在这样的场景下,构建一个高效、可扩展的微服务架构是至关重要的。本文将通过 Spring Cloud 构建一个简化版的抖音推荐系统,探讨微服务架构的设计与实践。
二、业务梳理
在正式的开发前,我们需要先对这个简化版的推荐系统所需要的功能做下梳理:
用户行为数据
推荐系统的核心在于个性化推荐,而个性化推荐的前提是对用户行为的全面了解。用户的每一次操作(如观看、点赞、转发、评论等)都会影响推荐结果。因此,系统需要具备以下功能:
- 记录用户行为数据:记录用户在平台上与视频的交互行为(比如用户观看了哪些视频、点赞了哪些视频等)。
- 管理用户画像:基于用户的历史行为,生成用户的兴趣画像,用于推荐计算。
视频资源管理
抖音作为一个短视频平台,需要管理大量的视频资源。每个视频都有不同的标签(如类型、话题、风格等),这些标签是推荐算法的重要依据。因此,系统需要:
- 存储视频基本信息:包括视频的ID、标题、标签、上传时间等。
- 提供视频分类:根据视频的标签信息,将视频分类以便后续推荐。
个性化推荐
推荐系统的核心功能就是根据用户的兴趣和视频内容的标签,生成个性化推荐列表。为了实现这一功能,系统需要:
- 获取用户画像和视频标签:结合用户的兴趣画像与视频的标签,匹配用户可能感兴趣的视频。
- 生成推荐列表:根据算法计算,生成个性化推荐的视频列表并返回给用户。
用户行为数据的实时处理
用户在平台上的行为是实时发生的,因此推荐系统需要能够实时处理这些行为数据,并根据最新的行为更新用户画像。为此,系统需要:
- 实时处理用户行为:当用户进行某个操作时(如点赞或观看某个视频),系统能够实时接收这些事件,并更新用户画像。
- 异步处理:为了不影响用户的使用体验,行为数据的处理应尽量异步化,通过消息队列等手段解耦实时数据处理与推荐服务。
通过上述的业务需求梳理,我们最终可以总结出一个简化版的推荐系统需要具备的核心功能:
- 用户行为管理:记录用户的观看、点赞等行为。
- 视频资源管理:存储视频的基本信息和标签。
- 个性化推荐:结合用户画像和视频标签,生成推荐列表。
- 实时数据处理:处理用户行为数据,实时更新用户画像。
三、架构设计
完成了需求梳理后,我们总结了四大核心功能,进而可以抽象出四个服务来分别完成上述功能,因此,我们可以简单地绘制下业务架构图,如下:
这里我做了一些简化:比如省略了API网关,客户端直接请求到推荐服务上获取响应。因为我们决定使用SpringCloud来搭建这个项目,所以我们的服务都注册到Eureka上,服务之间的调用则采用Feign实现。另外,分别使用Redis和Kafka来缓存用户画像和传递用户行为数据。
四、具体实现
通过需求梳理、技术选型、架构设计,我们已经完成了项目开发前的准备工作,现在就可以正式进行开发了。首先需要我们根据业务架构图完成项目的基础搭建。
项目搭建
使用IDEA通过maven-archetype-quickstart
进行快速创建,如下:
接下来通过同样的方式分别创建四大业务模块和Eureka服务模块,项目结构如下:
此时,在父模块的pom.xml文件中就可以看到子模块都已经被管理起来,我们再引入SpringBoot和
SpringCloud的依赖(Ps:版本可根据喜好自行选择,我这里演示也无所谓了,就不用SpringBoot3了),如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>eureka-server</module>
<module>user-service</module>
<module>video-service</module>
<module>recommendation-service</module>
<module>data-processing-service</module>
</modules>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.boot.version>2.6.4</spring.boot.version>
<spring.cloud.version>2021.0.1</spring.cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Spring Boot Dependencies-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Cloud Dependencies-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
此时,我们就完成了项目的基础搭建,接下来开始编写各个服务的代码。
创建 Eureka 服务
各个服务都有自己的依赖需要导入,Eureka服务所需依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>eureka-server</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Eureka Server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
</project>
导入相关依赖后,可以创建 eureka-server
模块中的主类 EurekaServerApplication
,如下:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication
{
public static void main( String[] args ) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
然后我们需要对eureka服务
进行配置,在 src/main/resources
目录下创建 application.yml
如下:
server:
port: 8761 # 服务运行的端口
eureka:
client:
register-with-eureka: false
fetch-registry: false
server:
enable-self-preservation: false
完成了上述配置后,我们需要启动Eureka来看看当前的这个服务是否可以正常运行,也很简单,启动 EurekaServerApplication
,访问 http://localhost:8761
,此时可以看到 Eureka Server 的控制台界面,如下:
创建用户服务
依然是先导入依赖,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>user-service</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Eureka Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
同样地需要配置application.yml
,四大业务模块都需要将自己注册到eureka
中,如下:
server:
port: 8081 # 服务运行的端口
spring:
application:
name: user-service # 用户服务的名称
kafka:
bootstrap-servers: localhost:9092 # Kafka 服务器地址
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/ # Eureka Server 的地址
fetch-registry: true # 从 Eureka 拉取服务注册表
register-with-eureka: true # 将自己注册到 Eureka
同理,完善用户服务的主类,这里主要是模拟用户看过的视频,如下:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@EnableEurekaClient
@RestController
public class UserServiceApplication {
public static void main( String[] args ) {
SpringApplication.run(UserServiceApplication.class, args);
}
@GetMapping("/users/{userId}/history")
public List<String> getUserHistory(@PathVariable String userId) {
// 模拟用户的历史行为数据,这里返回一些示例视频ID
return Arrays.asList("1", "3", "5","7");
}
}
这里说明下,我们本次的实例都采用模拟数据的思路,主要是让大家了解下设计思路,所以就不建立相关的库表了。启动用户服务后就可以在eureka
控制台看到user
服务了,如下:
另外,通过工具(如 Apifox)来访问用户服务的 REST API,如下:
完成了基础的配置,但别忘了在我们的架构设计中用户服务还有一项重任-记录用户行为数据。因此,我们需要在用户服务里编写Kafka的生产者服务,如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private static final String TOPIC = "user-behavior-topic"; // Kafka 主题名称
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送用户行为到 Kafka
*
* @param userId 用户ID
* @param videoId 视频ID
* @param videoTag 视频标签
* @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
*/
public void sendUserBehavior(String userId, String videoId, String videoTag, int isInterested) {
// 构建消息
String message = String.format("User:%s watched Video:%s [Tag:%s] with interest:%d", userId, videoId, videoTag, isInterested);
kafkaTemplate.send(TOPIC, message);
}
}
同时,我们编写一个记录用户观看视频的行为
的接口来模拟用户刷视频的场景,代码如下:
import com.recommendation.serive.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class UserController {
@Autowired
private KafkaProducerService kafkaProducerService;
/**
* 记录用户观看视频的行为,并发送到 Kafka
* @param userId 用户 ID
* @param videoId 视频 ID
* @param videoTag 视频标签
* @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
*/
@PostMapping("/users/{userId}/watch/{videoId}/{videoTag}/{isInterested}")
public String watchVideo(@PathVariable String userId,
@PathVariable String videoId,
@PathVariable String videoTag,
@PathVariable int isInterested) {
// 调用 Kafka 生产者服务将行为数据(包括视频标签)发送到 Kafka
kafkaProducerService.sendUserBehavior(userId, videoId, videoTag, isInterested);
return String.format("User %s watched video %s with tag %s and interest %d", userId, videoId, videoTag, isInterested);
}
}
通过Apifox
模拟数据进行调用,查看API的响应结果和Kafka来观察消息是否发送成功,如下:
从结果来看,我们的消息发送成功。
创建视频服务
同样地,依然是先完善依赖,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>video-service</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Eureka Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
接着配置application.yml
文件,如下:
server:
port: 8082 # 服务运行的端口
spring:
application:
name: video-service # 视频服务的名称
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/ # Eureka Server 的地址
fetch-registry: true # 从 Eureka 拉取服务注册表
register-with-eureka: true # 将自己注册到 Eureka
接着又是主类,视频服务通过模拟提供了所有的视频,如下:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@EnableEurekaClient
@RestController
public class VideoServiceApplication {
public static void main( String[] args ) {
SpringApplication.run(VideoServiceApplication.class, args);
}
@GetMapping("/videos")
public List<Video> getAllVideos() {
// 模拟视频数据,这里返回一些示例视频
return Arrays.asList(
new Video("1", "娱乐"),
new Video("2", "娱乐"),
new Video("3", "科技"),
new Video("4", "美食"),
new Video("5", "科技"),
new Video("6", "美食"),
new Video("7", "旅游"),
new Video("8", "科技")
);
}
static class Video {
private String id;
private String tag;
public Video(String id, String tag) {
this.id = id;
this.tag = tag;
}
public String getId() {
return id;
}
public String getTag() {
return tag;
}
}
}
和用户服务一样,启动后查看eureka
控制台和测试API
调用,如下:
创建推荐服务
依然是先进行依赖和配置文件的编写,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>recommendation-service</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Eureka Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- Feign Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- Spring Boot Redis Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Jackson Databind (用于 Redis 的序列化/反序列化) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
server:
port: 8083 # 服务运行的端口
spring:
application:
name: recommendation-service # 推荐服务的名称
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/ # Eureka Server 的地址
fetch-registry: true # 从 Eureka 拉取服务注册表
register-with-eureka: true # 将自己注册到 Eureka
由于推荐服务 需要调用 用户服务 和 视频服务 来获取用户的历史行为和视频列表。我们可以使用 Feign 客户端来简化服务之间的调用。因此,我们需要建立一个client目录来存放客户端接口,如下:
两个客户端接口都是模拟获取数据,如下:
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import java.util.List;
@FeignClient(name = "user-service")
public interface UserServiceClient {
@GetMapping("/users/{userId}/history")
List<String> getUserHistory(@PathVariable("userId") String userId);
}
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import java.util.List;
@FeignClient(name = "video-service") // 指定服务名称为 video-service
public interface VideoServiceClient {
@GetMapping("/videos")
List<Video> getAllVideos();
class Video {
private String id;
private String tag;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
}
}
当然,别忘了推荐服务是根据用户画像来推荐视频,所以这里我们还需要从redis中获取用户画像,那么我们声明一个redis的工具类来获取用户画像,如下:
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.Map;
@Repository
public class RedisUserProfileRepository {
@Autowired
private StringRedisTemplate redisTemplate; // 用于操作 Redis
@Autowired
private ObjectMapper objectMapper; // 用于 JSON 序列化/反序列化
private static final String USER_PROFILE_KEY_PREFIX = "user_profile:";
/**
* 从 Redis 中获取用户画像
* @param userId 用户 ID
* @return 用户画像(Map结构)
*/
public Map<String, Object> getUserProfile(String userId) throws JsonProcessingException {
String key = USER_PROFILE_KEY_PREFIX + userId;
String jsonProfile = redisTemplate.opsForValue().get(key); // 从 Redis 获取用户画像(JSON 字符串)
if (jsonProfile != null) {
return objectMapper.readValue(jsonProfile, Map.class); // 将 JSON 转为 Map
}
return null; // 如果 Redis 中没有用户画像,返回 null
}
}
接着我们在主类里写推荐服务的主要逻辑,大家可以直接看代码的注释,根据用户画像进行推荐,没有则全部返回,如下:
import com.recommendation.client.VideoServiceClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@RestController
public class RecommendationServiceApplication {
public static void main( String[] args ) {
SpringApplication.run(RecommendationServiceApplication.class, args);
}
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private VideoServiceClient videoServiceClient;
@GetMapping("/recommendations/{userId}")
public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
//从Redis获取用户画像,包括兴趣标签和观看历史
String userProfileKey = "user:" + userId;
//获取用户的兴趣标签
Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
//获取用户的观看历史
List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
//如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
if (userInterests.isEmpty() || userHistory.isEmpty()) {
return videoServiceClient.getAllVideos();
}
//获取所有可用的视频列表
List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
//根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
return allVideos.stream()
//筛选出用户未看过的视频
.filter(video -> !userHistory.contains(video.getId()))
//只推荐与用户画像中的兴趣标签匹配的视频
.filter(video -> userInterests.contains(video.getTag()))
.collect(Collectors.toList());
}
}
创建数据处理服务
数据处理服务是整个系统运转的关键,一方面它需要接收用户行为数据,另一方面还需要对用户行为数据进行处理去更新用户画像,所以这个服务的主要逻辑也是围绕它们展开。老规矩,依然是先补充依赖和配置文件,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.itasass</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>data-processing-service</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Eureka Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</project>
server:
port: 8084 # 服务运行的端口
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka broker 地址
consumer:
group-id: data-processing-group # Kafka 消费者组 ID
auto-offset-reset: earliest # 消费者从最早的消息开始读取
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
redis:
host: 127.0.0.1
port: 6379
接下来,通过一个kafka的监听接口来实现上述功能,我这里演示就随意了,大家用MQ传递消息还是用JSON格式,否则你解析很容易出问题,如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class KafkaConsumerService {
@Autowired
private DataProcessingService dataProcessingService;
// 监听 Kafka 主题,消费用户行为数据
@KafkaListener(topics = "user-behavior-topic", groupId = "data-processing-group")
public void consumeUserBehavior(String message) {
try {
System.out.println("Received message: " + message);
// 使用正则表达式提取信息
String userId = extractValue(message, "User:(\d+)");
String videoId = extractValue(message, "Video:(\d+)");
String videoTag = extractValue(message, "Tag:([^\]]+)");
if (userId != null && videoId != null && videoTag != null) {
dataProcessingService.processUserBehavior(userId, videoId, videoTag);
} else {
System.err.println("Failed to parse message: " + message);
}
} catch (Exception e) {
System.err.println("Error processing message: " + message);
e.printStackTrace();
}
}
private String extractValue(String message, String pattern) {
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(message);
return m.find() ? m.group(1) : null;
}
}
在数据处理服务中,我们需要处理消息并更新用户画像,如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class DataProcessingService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String USER_PROFILE_KEY_PREFIX = "user:";
private static final String USER_HISTORY_KEY = "history";
private static final String USER_INTERESTS_KEY = "interests";
/**
* 处理用户行为数据:更新 Redis 中的用户画像
* @param userId 用户 ID
* @param videoId 视频 ID
* @param videoTag 视频的标签
*/
public void processUserBehavior(String userId, String videoId, String videoTag) {
//1.更新用户的观看历史记录
updateUserHistory(userId, videoId);
//2.更新用户的兴趣标签
updateUserInterests(userId, videoTag);
}
/**
* 更新用户的观看历史记录
* @param userId 用户 ID
* @param videoId 视频 ID
*/
private void updateUserHistory(String userId, String videoId) {
String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
redisTemplate.opsForList()
.leftPush(userProfileKey + ":" + USER_HISTORY_KEY, videoId);
}
/**
* 更新用户的兴趣标签
* @param userId 用户 ID
* @param tag 视频的标签
*/
private void updateUserInterests(String userId, String tag) {
String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
redisTemplate.opsForSet()
.add(userProfileKey + ":" + USER_INTERESTS_KEY, tag);
}
}
至此,我们的这个简版推荐系统就搭建完成了,可以看到eureka
的控制台上四大业务服务都已经注册到了,如下:
五、业务测试
完成了系统的开发后,我们现在开始测试,首先我们先访问推荐服务,按照我们的逻辑:由于是第一次访问还没有形成画像,此时应该返回所有视频,如下:
可以看到此时返回了我们模拟的所有8
个视频,那么假定我这个用户对科技类
视频感兴趣,假如我看了id为3
的视频并点赞了,那么下次推荐服务应该只返回id=5和8
的视频。我们先去调用一次记录用户观看视频的行为的接口,如下:
然后我们去看下reids里是否已经有用户画像了,如下:
那么这时候我们再去调用推荐服务,就会得到我们上面期望的结果了,如下:
我们的推荐系统通过多个服务的协作成功实现了预期功能,但是按照现在的实现,如果其中一个服务挂了,整个推荐系统就不可用了,显然这是我们不期望发生的,所以我们就需要考虑做服务降级,确保即使某个服务出问题,不影响整个系统的使用。
六、服务降级
我们在推荐服务的主类接口里有调用redis
来获取用户画像,就以它为例,因为我们用的springcloud
,就直接用配套的resilience4j
来实现,依然是先引入依赖和补充配置,如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
resilience4j:
circuitbreaker:
instances:
redisService: # 熔断器名称,与 @CircuitBreaker 注解中的 name 属性对应
slidingWindowSize: 10 # 熔断器的滑动窗口大小,用于计算失败率的调用数量
failureRateThreshold: 50 # 失败率阈值,50% 代表当失败率达到 50% 时开启熔断
waitDurationInOpenState: 10000 # 熔断器在打开状态下保持的时间(毫秒),即熔断器打开后等待 10 秒再尝试进入半开状态
permittedNumberOfCallsInHalfOpenState: 3 # 半开状态下允许的最大调用次数,用于测试服务是否恢复
minimumNumberOfCalls: 5 # 统计失败率所需的最小调用次数,至少进行 5 次调用后熔断器才会判断是否开启熔断
automaticTransitionFromOpenToHalfOpenEnabled: true # 允许自动从打开状态过渡到半开状态,尝试恢复服务
slowCallDurationThreshold: 2000 # 慢调用的阈值,2 秒内完成的调用视为正常,超过 2 秒的调用视为慢调用
slowCallRateThreshold: 50 # 慢调用比例阈值,50% 代表当慢调用比例达到 50% 时开启熔断
然后我们改造原推荐接口,加上注解和降级方法,主要逻辑是当redis连接不上时,直接调视频服务的接口获取所有视频返回,代码如下:
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private VideoServiceClient videoServiceClient;
private static final String REDIS_BACKEND = "redisService"; // 定义熔断器名称
@GetMapping("/recommendations/{userId}")
@CircuitBreaker(name = REDIS_BACKEND, fallbackMethod = "getAllVideosFallback") // 添加熔断器
public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
//从Redis获取用户画像,包括兴趣标签和观看历史
String userProfileKey = "user:" + userId;
//获取用户的兴趣标签
Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
//获取用户的观看历史
List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
//如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
if (userInterests.isEmpty() || userHistory.isEmpty()) {
return videoServiceClient.getAllVideos();
}
//获取所有可用的视频列表
List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
//根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
return allVideos.stream()
//筛选出用户未看过的视频
.filter(video -> !userHistory.contains(String.valueOf(video.getId())))
//只推荐与用户画像中的兴趣标签匹配的视频
.filter(video -> userInterests.contains(String.valueOf(video.getTag())))
.collect(Collectors.toList());
}
// 降级方法,当 Redis 连接失败时调用此方法
public List<VideoServiceClient.Video> getAllVideosFallback(Throwable ex) {
// 打印日志,记录异常信息
System.err.println("Redis is unavailable. Returning fallback recommendation. Exception: " + ex.getMessage());
// 在降级方法中调用 Feign 客户端返回默认的视频列表
return videoServiceClient.getAllVideos();
}
}
然后我们把Redis停掉,如下:
这时候我们再调用推荐接口,分别观察控制台有输出报错信息提示我们redis连接失败以及接口返回了所有视频数据,如下:
我们这里模拟了redis挂掉的情况,但其实熔断降级机制只要是需要调用别的服务的地方,我们都可以做相关处理来提升可用性。
七、小结
本文基于Spring Cloud
实现了一个简单的推荐系统,当然真实的推荐系统远比这复杂,涉及到很多算法,例如协同过滤、用户行为等等。这篇文章的重点是带大家建立微服务的设计思想:怎么围绕业务去做服务拆解,怎么通过Spring Cloud
去构建微服务项目以及服务的熔断降级处理,并最终希望让大家在看完后能够有所收获!