微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统

简介: 本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。

一、引子

抖音的推荐系统是其成功的关键之一,而背后是一套复杂的微服务架构支撑着高并发和庞大的用户数据处理。每当用户刷到新的视频时,背后都有一个复杂的推荐算法在实时运行。而在这样的场景下,构建一个高效、可扩展的微服务架构是至关重要的。本文将通过 Spring Cloud 构建一个简化版的抖音推荐系统,探讨微服务架构的设计与实践。

二、业务梳理

在正式的开发前,我们需要先对这个简化版的推荐系统所需要的功能做下梳理:

用户行为数据

推荐系统的核心在于个性化推荐,而个性化推荐的前提是对用户行为的全面了解。用户的每一次操作(如观看点赞转发评论等)都会影响推荐结果。因此,系统需要具备以下功能:

  • 记录用户行为数据:记录用户在平台上与视频的交互行为(比如用户观看了哪些视频、点赞了哪些视频等)。
  • 管理用户画像:基于用户的历史行为,生成用户的兴趣画像,用于推荐计算。

视频资源管理

抖音作为一个短视频平台,需要管理大量的视频资源。每个视频都有不同的标签(如类型话题风格等),这些标签是推荐算法的重要依据。因此,系统需要:

  • 存储视频基本信息:包括视频的ID、标题、标签、上传时间等。
  • 提供视频分类:根据视频的标签信息,将视频分类以便后续推荐。

个性化推荐

推荐系统的核心功能就是根据用户的兴趣和视频内容的标签,生成个性化推荐列表。为了实现这一功能,系统需要:

  • 获取用户画像和视频标签:结合用户的兴趣画像与视频的标签,匹配用户可能感兴趣的视频。
  • 生成推荐列表:根据算法计算,生成个性化推荐的视频列表并返回给用户。

用户行为数据的实时处理

用户在平台上的行为是实时发生的,因此推荐系统需要能够实时处理这些行为数据,并根据最新的行为更新用户画像。为此,系统需要:

  • 实时处理用户行为:当用户进行某个操作时(如点赞或观看某个视频),系统能够实时接收这些事件,并更新用户画像。
  • 异步处理:为了不影响用户的使用体验,行为数据的处理应尽量异步化,通过消息队列等手段解耦实时数据处理与推荐服务。

通过上述的业务需求梳理,我们最终可以总结出一个简化版的推荐系统需要具备的核心功能:

  • 用户行为管理:记录用户的观看、点赞等行为。
  • 视频资源管理:存储视频的基本信息和标签。
  • 个性化推荐:结合用户画像和视频标签,生成推荐列表。
  • 实时数据处理:处理用户行为数据,实时更新用户画像。

三、架构设计

完成了需求梳理后,我们总结了四大核心功能,进而可以抽象出四个服务来分别完成上述功能,因此,我们可以简单地绘制下业务架构图,如下:
01.png

这里我做了一些简化:比如省略了API网关,客户端直接请求到推荐服务上获取响应。因为我们决定使用SpringCloud来搭建这个项目,所以我们的服务都注册到Eureka上,服务之间的调用则采用Feign实现。另外,分别使用RedisKafka来缓存用户画像和传递用户行为数据。

四、具体实现

通过需求梳理、技术选型、架构设计,我们已经完成了项目开发前的准备工作,现在就可以正式进行开发了。首先需要我们根据业务架构图完成项目的基础搭建。

项目搭建

使用IDEA通过maven-archetype-quickstart进行快速创建,如下:
02.png
接下来通过同样的方式分别创建四大业务模块和Eureka服务模块,项目结构如下:
03.png
此时,在父模块的pom.xml文件中就可以看到子模块都已经被管理起来,我们再引入SpringBoot和
SpringCloud的依赖(Ps:版本可根据喜好自行选择,我这里演示也无所谓了,就不用SpringBoot3了),如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itasass</groupId>
    <artifactId>recommendation-system</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>eureka-server</module>
        <module>user-service</module>
        <module>video-service</module>
        <module>recommendation-service</module>
        <module>data-processing-service</module>
    </modules>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.boot.version>2.6.4</spring.boot.version>
        <spring.cloud.version>2021.0.1</spring.cloud.version>
    </properties>

    <dependencyManagement>

        <dependencies>

            <!-- Spring Boot Dependencies-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Spring Cloud Dependencies-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>

    </dependencyManagement>

</project>

此时,我们就完成了项目的基础搭建,接下来开始编写各个服务的代码。

创建 Eureka 服务

各个服务都有自己的依赖需要导入,Eureka服务所需依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>eureka-server</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

    </dependencies>

</project>

导入相关依赖后,可以创建 eureka-server 模块中的主类 EurekaServerApplication,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication
{
   
    public static void main( String[] args ) {
   
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

然后我们需要对eureka服务进行配置,在 src/main/resources 目录下创建 application.yml如下:

server:
  port: 8761 # 服务运行的端口

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
  server:
    enable-self-preservation: false

完成了上述配置后,我们需要启动Eureka来看看当前的这个服务是否可以正常运行,也很简单,启动 EurekaServerApplication,访问 http://localhost:8761,此时可以看到 Eureka Server 的控制台界面,如下:
04.png

创建用户服务

依然是先导入依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

同样地需要配置application.yml,四大业务模块都需要将自己注册到eureka中,如下:

server:
  port: 8081  # 服务运行的端口


spring:
  application:
    name: user-service  # 用户服务的名称
  kafka:
    bootstrap-servers: localhost:9092  # Kafka 服务器地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

同理,完善用户服务的主类,这里主要是模拟用户看过的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
@EnableEurekaClient
@RestController
public class UserServiceApplication {

    public static void main( String[] args ) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

    @GetMapping("/users/{userId}/history")
    public List<String> getUserHistory(@PathVariable String userId) {
        // 模拟用户的历史行为数据,这里返回一些示例视频ID
        return Arrays.asList("1", "3", "5","7");
    }
}

这里说明下,我们本次的实例都采用模拟数据的思路,主要是让大家了解下设计思路,所以就不建立相关的库表了。启动用户服务后就可以在eureka控制台看到user服务了,如下:
05.png
另外,通过工具(如 Apifox)来访问用户服务的 REST API,如下:
06.png
完成了基础的配置,但别忘了在我们的架构设计中用户服务还有一项重任-记录用户行为数据。因此,我们需要在用户服务里编写Kafka的生产者服务,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
   

  private static final String TOPIC = "user-behavior-topic";  // Kafka 主题名称

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  /**
   * 发送用户行为到 Kafka
   *
   * @param userId       用户ID
   * @param videoId      视频ID
   * @param videoTag     视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  public void sendUserBehavior(String userId, String videoId, String videoTag, int isInterested) {
   
    // 构建消息
    String message = String.format("User:%s watched Video:%s [Tag:%s] with interest:%d", userId, videoId, videoTag, isInterested);
    kafkaTemplate.send(TOPIC, message);
  }

}

同时,我们编写一个记录用户观看视频的行为的接口来模拟用户刷视频的场景,代码如下:

import com.recommendation.serive.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
   

  @Autowired
  private KafkaProducerService kafkaProducerService;

  /**
   * 记录用户观看视频的行为,并发送到 Kafka
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  @PostMapping("/users/{userId}/watch/{videoId}/{videoTag}/{isInterested}")
  public String watchVideo(@PathVariable String userId,
                           @PathVariable String videoId,
                           @PathVariable String videoTag,
                           @PathVariable int isInterested) {
   
    // 调用 Kafka 生产者服务将行为数据(包括视频标签)发送到 Kafka
    kafkaProducerService.sendUserBehavior(userId, videoId, videoTag, isInterested);

    return String.format("User %s watched video %s with tag %s and interest %d", userId, videoId, videoTag, isInterested);
  }
}

通过Apifox模拟数据进行调用,查看API的响应结果和Kafka来观察消息是否发送成功,如下:
07.png
08.png
从结果来看,我们的消息发送成功。

创建视频服务

同样地,依然是先完善依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>video-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

接着配置application.yml文件,如下:

server:
  port: 8082  # 服务运行的端口

spring:
  application:
    name: video-service  # 视频服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

接着又是主类,视频服务通过模拟提供了所有的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;


@SpringBootApplication
@EnableEurekaClient
@RestController
public class VideoServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(VideoServiceApplication.class, args);
    }

    @GetMapping("/videos")
    public List<Video> getAllVideos() {
   
        // 模拟视频数据,这里返回一些示例视频
        return Arrays.asList(
                new Video("1", "娱乐"),
                new Video("2", "娱乐"),
                new Video("3", "科技"),
                new Video("4", "美食"),
                new Video("5", "科技"),
                new Video("6", "美食"),
                new Video("7", "旅游"),
                new Video("8", "科技")
        );
    }

    static class Video {
   
        private String id;
        private String tag;

        public Video(String id, String tag) {
   
            this.id = id;
            this.tag = tag;
        }

        public String getId() {
   
            return id;
        }

        public String getTag() {
   
            return tag;
        }
    }
}

和用户服务一样,启动后查看eureka控制台和测试API调用,如下:
09.png
10.png

创建推荐服务

依然是先进行依赖和配置文件的编写,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>recommendation-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Feign Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!-- Spring Boot Redis Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson Databind (用于 Redis 的序列化/反序列化) -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

</project>
server:
  port: 8083  # 服务运行的端口

spring:
  application:
    name: recommendation-service  # 推荐服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

由于推荐服务 需要调用 用户服务视频服务 来获取用户的历史行为和视频列表。我们可以使用 Feign 客户端来简化服务之间的调用。因此,我们需要建立一个client目录来存放客户端接口,如下:
11.png

两个客户端接口都是模拟获取数据,如下:

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.List;

@FeignClient(name = "user-service")
public interface UserServiceClient {
   

  @GetMapping("/users/{userId}/history")
  List<String> getUserHistory(@PathVariable("userId") String userId);
}
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.List;

@FeignClient(name = "video-service")  // 指定服务名称为 video-service
public interface VideoServiceClient {

  @GetMapping("/videos")
  List<Video> getAllVideos();

  class Video {
    private String id;
    private String tag;

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public String getTag() {
      return tag;
    }

    public void setTag(String tag) {
      this.tag = tag;
    }
  }
}

当然,别忘了推荐服务是根据用户画像来推荐视频,所以这里我们还需要从redis中获取用户画像,那么我们声明一个redis的工具类来获取用户画像,如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;

import java.util.Map;


@Repository
public class RedisUserProfileRepository {
   

  @Autowired
  private StringRedisTemplate redisTemplate;  // 用于操作 Redis

  @Autowired
  private ObjectMapper objectMapper;  // 用于 JSON 序列化/反序列化

  private static final String USER_PROFILE_KEY_PREFIX = "user_profile:";

  /**
   * 从 Redis 中获取用户画像
   * @param userId 用户 ID
   * @return 用户画像(Map结构)
   */
  public Map<String, Object> getUserProfile(String userId) throws JsonProcessingException {
   
    String key = USER_PROFILE_KEY_PREFIX + userId;
    String jsonProfile = redisTemplate.opsForValue().get(key);  // 从 Redis 获取用户画像(JSON 字符串)
    if (jsonProfile != null) {
   
      return objectMapper.readValue(jsonProfile, Map.class);  // 将 JSON 转为 Map
    }
    return null;  // 如果 Redis 中没有用户画像,返回 null
  }
}

接着我们在主类里写推荐服务的主要逻辑,大家可以直接看代码的注释,根据用户画像进行推荐,没有则全部返回,如下:

import com.recommendation.client.VideoServiceClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@RestController
public class RecommendationServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(RecommendationServiceApplication.class, args);
    }

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    @GetMapping("/recommendations/{userId}")
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(video.getId()))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(video.getTag()))
                .collect(Collectors.toList());
    }

}

创建数据处理服务

数据处理服务是整个系统运转的关键,一方面它需要接收用户行为数据,另一方面还需要对用户行为数据进行处理去更新用户画像,所以这个服务的主要逻辑也是围绕它们展开。老规矩,依然是先补充依赖和配置文件,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>data-processing-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

    </dependencies>
</project>
server:
  port: 8084  # 服务运行的端口

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka broker 地址
    consumer:
      group-id: data-processing-group   # Kafka 消费者组 ID
      auto-offset-reset: earliest       # 消费者从最早的消息开始读取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  redis:
    host: 127.0.0.1
    port: 6379

接下来,通过一个kafka的监听接口来实现上述功能,我这里演示就随意了,大家用MQ传递消息还是用JSON格式,否则你解析很容易出问题,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class KafkaConsumerService {
   

  @Autowired
  private DataProcessingService dataProcessingService;

  // 监听 Kafka 主题,消费用户行为数据
  @KafkaListener(topics = "user-behavior-topic", groupId = "data-processing-group")
  public void consumeUserBehavior(String message) {
   
    try {
   
      System.out.println("Received message: " + message);

      // 使用正则表达式提取信息
      String userId = extractValue(message, "User:(\d+)");
      String videoId = extractValue(message, "Video:(\d+)");
      String videoTag = extractValue(message, "Tag:([^\]]+)");

      if (userId != null && videoId != null && videoTag != null) {
   
        dataProcessingService.processUserBehavior(userId, videoId, videoTag);
      } else {
   
        System.err.println("Failed to parse message: " + message);
      }
    } catch (Exception e) {
   
      System.err.println("Error processing message: " + message);
      e.printStackTrace();
    }
  }

  private String extractValue(String message, String pattern) {
   
    Pattern r = Pattern.compile(pattern);
    Matcher m = r.matcher(message);
    return m.find() ? m.group(1) : null;
  }

}

在数据处理服务中,我们需要处理消息并更新用户画像,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class DataProcessingService {

  @Autowired
  private StringRedisTemplate redisTemplate;

  private static final String USER_PROFILE_KEY_PREFIX = "user:";
  private static final String USER_HISTORY_KEY = "history";
  private static final String USER_INTERESTS_KEY = "interests";

  /**
   * 处理用户行为数据:更新 Redis 中的用户画像
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频的标签
   */
  public void processUserBehavior(String userId, String videoId, String videoTag) {
    //1.更新用户的观看历史记录
    updateUserHistory(userId, videoId);
    //2.更新用户的兴趣标签
    updateUserInterests(userId, videoTag);
  }

  /**
   * 更新用户的观看历史记录
   * @param userId 用户 ID
   * @param videoId 视频 ID
   */
  private void updateUserHistory(String userId, String videoId) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForList()
            .leftPush(userProfileKey + ":" + USER_HISTORY_KEY, videoId);
  }

  /**
   * 更新用户的兴趣标签
   * @param userId 用户 ID
   * @param tag 视频的标签
   */
  private void updateUserInterests(String userId, String tag) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForSet()
            .add(userProfileKey + ":" + USER_INTERESTS_KEY, tag);
  }

}

至此,我们的这个简版推荐系统就搭建完成了,可以看到eureka的控制台上四大业务服务都已经注册到了,如下:
12.png

五、业务测试

完成了系统的开发后,我们现在开始测试,首先我们先访问推荐服务,按照我们的逻辑:由于是第一次访问还没有形成画像,此时应该返回所有视频,如下:
13.png
可以看到此时返回了我们模拟的所有8个视频,那么假定我这个用户对科技类视频感兴趣,假如我看了id为3的视频并点赞了,那么下次推荐服务应该只返回id=5和8的视频。我们先去调用一次记录用户观看视频的行为的接口,如下:
14.png
然后我们去看下reids里是否已经有用户画像了,如下:
15.png
16.png
那么这时候我们再去调用推荐服务,就会得到我们上面期望的结果了,如下:
17.png
我们的推荐系统通过多个服务的协作成功实现了预期功能,但是按照现在的实现,如果其中一个服务挂了,整个推荐系统就不可用了,显然这是我们不期望发生的,所以我们就需要考虑做服务降级,确保即使某个服务出问题,不影响整个系统的使用。

六、服务降级

我们在推荐服务的主类接口里有调用redis来获取用户画像,就以它为例,因为我们用的springcloud,就直接用配套的resilience4j来实现,依然是先引入依赖和补充配置,如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
resilience4j:
  circuitbreaker:
    instances:
      redisService:  # 熔断器名称,与 @CircuitBreaker 注解中的 name 属性对应
        slidingWindowSize: 10  # 熔断器的滑动窗口大小,用于计算失败率的调用数量
        failureRateThreshold: 50  # 失败率阈值,50% 代表当失败率达到 50% 时开启熔断
        waitDurationInOpenState: 10000  # 熔断器在打开状态下保持的时间(毫秒),即熔断器打开后等待 10 秒再尝试进入半开状态
        permittedNumberOfCallsInHalfOpenState: 3  # 半开状态下允许的最大调用次数,用于测试服务是否恢复
        minimumNumberOfCalls: 5  # 统计失败率所需的最小调用次数,至少进行 5 次调用后熔断器才会判断是否开启熔断
        automaticTransitionFromOpenToHalfOpenEnabled: true  # 允许自动从打开状态过渡到半开状态,尝试恢复服务
        slowCallDurationThreshold: 2000  # 慢调用的阈值,2 秒内完成的调用视为正常,超过 2 秒的调用视为慢调用
        slowCallRateThreshold: 50  # 慢调用比例阈值,50% 代表当慢调用比例达到 50% 时开启熔断

然后我们改造原推荐接口,加上注解和降级方法,主要逻辑是当redis连接不上时,直接调视频服务的接口获取所有视频返回,代码如下:

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    private static final String REDIS_BACKEND = "redisService"; // 定义熔断器名称

    @GetMapping("/recommendations/{userId}")
    @CircuitBreaker(name = REDIS_BACKEND, fallbackMethod = "getAllVideosFallback") // 添加熔断器
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(String.valueOf(video.getId())))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(String.valueOf(video.getTag())))
                .collect(Collectors.toList());
    }

    // 降级方法,当 Redis 连接失败时调用此方法
    public List<VideoServiceClient.Video> getAllVideosFallback(Throwable ex) {
   
        // 打印日志,记录异常信息
        System.err.println("Redis is unavailable. Returning fallback recommendation. Exception: " + ex.getMessage());

        // 在降级方法中调用 Feign 客户端返回默认的视频列表
        return videoServiceClient.getAllVideos();
    }
}

然后我们把Redis停掉,如下:
18.png
这时候我们再调用推荐接口,分别观察控制台有输出报错信息提示我们redis连接失败以及接口返回了所有视频数据,如下:
19.png
20.png
我们这里模拟了redis挂掉的情况,但其实熔断降级机制只要是需要调用别的服务的地方,我们都可以做相关处理来提升可用性。

七、小结

本文基于Spring Cloud实现了一个简单的推荐系统,当然真实的推荐系统远比这复杂,涉及到很多算法,例如协同过滤、用户行为等等。这篇文章的重点是带大家建立微服务的设计思想:怎么围绕业务去做服务拆解,怎么通过Spring Cloud去构建微服务项目以及服务的熔断降级处理,并最终希望让大家在看完后能够有所收获!

目录
相关文章
|
15天前
|
XML Java 开发者
Spring底层架构核心概念解析
理解 Spring 框架的核心概念对于开发和维护 Spring 应用程序至关重要。IOC 和 AOP 是其两个关键特性,通过依赖注入和面向切面编程实现了高效的模块化和松耦合设计。Spring 容器管理着 Beans 的生命周期和配置,而核心模块为各种应用场景提供了丰富的功能支持。通过全面掌握这些核心概念,开发者可以更加高效地利用 Spring 框架开发企业级应用。
64 18
|
19天前
|
存储 缓存 关系型数据库
社交软件红包技术解密(六):微信红包系统的存储层架构演进实践
微信红包本质是小额资金在用户帐户流转,有发、抢、拆三大步骤。在这个过程中对事务有高要求,所以订单最终要基于传统的RDBMS,这方面是它的强项,最终订单的存储使用互联网行业最通用的MySQL数据库。支持事务、成熟稳定,我们的团队在MySQL上有长期技术积累。但是传统数据库的扩展性有局限,需要通过架构解决。
57 18
|
4天前
|
传感器 监控 安全
智慧工地云平台的技术架构解析:微服务+Spring Cloud如何支撑海量数据?
慧工地解决方案依托AI、物联网和BIM技术,实现对施工现场的全方位、立体化管理。通过规范施工、减少安全隐患、节省人力、降低运营成本,提升工地管理的安全性、效率和精益度。该方案适用于大型建筑、基础设施、房地产开发等场景,具备微服务架构、大数据与AI分析、物联网设备联网、多端协同等创新点,推动建筑行业向数字化、智能化转型。未来将融合5G、区块链等技术,助力智慧城市建设。
|
1月前
|
存储 消息中间件 小程序
转转平台IM系统架构设计与实践(一):整体架构设计
本文描述了转转IM为整个平台提供的支撑能力,给出了系统的整体架构设计,分析了系统架构的特性。
71 10
|
1月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
2月前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
3月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
77 3
|
3月前
|
Cloud Native 安全 数据安全/隐私保护
云原生架构下的微服务治理与挑战####
随着云计算技术的飞速发展,云原生架构以其高效、灵活、可扩展的特性成为现代企业IT架构的首选。本文聚焦于云原生环境下的微服务治理问题,探讨其在促进业务敏捷性的同时所面临的挑战及应对策略。通过分析微服务拆分、服务间通信、故障隔离与恢复等关键环节,本文旨在为读者提供一个关于如何在云原生环境中有效实施微服务治理的全面视角,助力企业在数字化转型的道路上稳健前行。 ####
|
2月前
|
Java 开发者 微服务
从单体到微服务:如何借助 Spring Cloud 实现架构转型
**Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
284 69
从单体到微服务:如何借助 Spring Cloud 实现架构转型
|
2月前
|
设计模式 负载均衡 监控
探索微服务架构下的API网关设计
在微服务的大潮中,API网关如同一座桥梁,连接着服务的提供者与消费者。本文将深入探讨API网关的核心功能、设计原则及实现策略,旨在为读者揭示如何构建一个高效、可靠的API网关。通过分析API网关在微服务架构中的作用和挑战,我们将了解到,一个优秀的API网关不仅要处理服务路由、负载均衡、认证授权等基础问题,还需考虑如何提升系统的可扩展性、安全性和可维护性。文章最后将提供实用的代码示例,帮助读者更好地理解和应用API网关的设计概念。
100 8